diff --git a/surfsense_backend/app/connectors/google_drive/__init__.py b/surfsense_backend/app/connectors/google_drive/__init__.py index 47cc8598e..a0e9c4484 100644 --- a/surfsense_backend/app/connectors/google_drive/__init__.py +++ b/surfsense_backend/app/connectors/google_drive/__init__.py @@ -2,13 +2,14 @@ from .change_tracker import categorize_change, fetch_all_changes, get_start_page_token from .client import GoogleDriveClient -from .content_extractor import download_and_process_file +from .content_extractor import download_and_extract_content, download_and_process_file from .credentials import get_valid_credentials, validate_credentials from .folder_manager import get_file_by_id, get_files_in_folder, list_folder_contents __all__ = [ "GoogleDriveClient", "categorize_change", + "download_and_extract_content", "download_and_process_file", "fetch_all_changes", "get_file_by_id", diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 1d08d38f7..6fa20bf8e 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -17,6 +17,160 @@ from .file_types import get_export_mime_type, is_google_workspace_file, should_s logger = logging.getLogger(__name__) +async def download_and_extract_content( + client: GoogleDriveClient, + file: dict[str, Any], +) -> tuple[str | None, dict[str, Any], str | None]: + """Download a Google Drive file and extract its content as markdown. + + ETL only -- no DB writes, no indexing, no summarization. + + Returns: + (markdown_content, drive_metadata, error_message) + On success error_message is None. + """ + file_id = file.get("id") + file_name = file.get("name", "Unknown") + mime_type = file.get("mimeType", "") + + if should_skip_file(mime_type): + return None, {}, f"Skipping {mime_type}" + + logger.info(f"Downloading file for content extraction: {file_name} ({mime_type})") + + drive_metadata: dict[str, Any] = { + "google_drive_file_id": file_id, + "google_drive_file_name": file_name, + "google_drive_mime_type": mime_type, + "source_connector": "google_drive", + } + if "modifiedTime" in file: + drive_metadata["modified_time"] = file["modifiedTime"] + if "createdTime" in file: + drive_metadata["created_time"] = file["createdTime"] + if "size" in file: + drive_metadata["file_size"] = file["size"] + if "webViewLink" in file: + drive_metadata["web_view_link"] = file["webViewLink"] + if "md5Checksum" in file: + drive_metadata["md5_checksum"] = file["md5Checksum"] + if is_google_workspace_file(mime_type): + drive_metadata["exported_as"] = "pdf" + drive_metadata["original_workspace_type"] = mime_type.split(".")[-1] + + temp_file_path = None + try: + # Download / export + if is_google_workspace_file(mime_type): + export_mime = get_export_mime_type(mime_type) + if not export_mime: + return None, drive_metadata, f"Cannot export Google Workspace type: {mime_type}" + content_bytes, error = await client.export_google_file(file_id, export_mime) + if error: + return None, drive_metadata, error + extension = ".pdf" if export_mime == "application/pdf" else ".txt" + else: + content_bytes, error = await client.download_file(file_id) + if error: + return None, drive_metadata, error + extension = Path(file_name).suffix or ".bin" + + with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp: + tmp.write(content_bytes) + temp_file_path = tmp.name + + # Parse to markdown + markdown = await _parse_file_to_markdown(temp_file_path, file_name) + return markdown, drive_metadata, None + + except Exception as e: + logger.warning(f"Failed to extract content from {file_name}: {e!s}") + return None, drive_metadata, str(e) + finally: + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except Exception: + pass + + +async def _parse_file_to_markdown(file_path: str, filename: str) -> str: + """Parse a local file to markdown using the configured ETL service.""" + lower = filename.lower() + + if lower.endswith((".md", ".markdown", ".txt")): + with open(file_path, encoding="utf-8") as f: + return f.read() + + if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")): + from app.config import config as app_config + from litellm import atranscription + + stt_service_type = ( + "local" + if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/") + else "external" + ) + if stt_service_type == "local": + from app.services.stt_service import stt_service + result = stt_service.transcribe_file(file_path) + text = result.get("text", "") + else: + with open(file_path, "rb") as audio_file: + kwargs: dict[str, Any] = { + "model": app_config.STT_SERVICE, + "file": audio_file, + "api_key": app_config.STT_SERVICE_API_KEY, + } + if app_config.STT_SERVICE_API_BASE: + kwargs["api_base"] = app_config.STT_SERVICE_API_BASE + resp = await atranscription(**kwargs) + text = resp.get("text", "") + + if not text: + raise ValueError("Transcription returned empty text") + return f"# Transcription of {filename}\n\n{text}" + + # Document files -- use configured ETL service + from app.config import config as app_config + + if app_config.ETL_SERVICE == "UNSTRUCTURED": + from langchain_unstructured import UnstructuredLoader + from app.utils.document_converters import convert_document_to_markdown + + loader = UnstructuredLoader( + file_path, + mode="elements", + post_processors=[], + languages=["eng"], + include_orig_elements=False, + include_metadata=False, + strategy="auto", + ) + docs = await loader.aload() + return await convert_document_to_markdown(docs) + + if app_config.ETL_SERVICE == "LLAMACLOUD": + from app.tasks.document_processors.file_processors import ( + parse_with_llamacloud_retry, + ) + + result = await parse_with_llamacloud_retry(file_path=file_path, estimated_pages=50) + markdown_documents = await result.aget_markdown_documents(split_by_page=False) + if not markdown_documents: + raise RuntimeError(f"LlamaCloud returned no documents for {filename}") + return markdown_documents[0].text + + if app_config.ETL_SERVICE == "DOCLING": + from docling.document_converter import DocumentConverter + + converter = DocumentConverter() + result = converter.convert(file_path) + return result.document.export_to_markdown() + + raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}") + + async def download_and_process_file( client: GoogleDriveClient, file: dict[str, Any], diff --git a/surfsense_backend/app/indexing_pipeline/document_hashing.py b/surfsense_backend/app/indexing_pipeline/document_hashing.py index 5dd7767a4..9edebd140 100644 --- a/surfsense_backend/app/indexing_pipeline/document_hashing.py +++ b/surfsense_backend/app/indexing_pipeline/document_hashing.py @@ -3,10 +3,17 @@ import hashlib from app.indexing_pipeline.connector_document import ConnectorDocument +def compute_identifier_hash( + document_type_value: str, unique_id: str, search_space_id: int +) -> str: + """Return a stable SHA-256 hash from raw identity components.""" + combined = f"{document_type_value}:{unique_id}:{search_space_id}" + return hashlib.sha256(combined.encode("utf-8")).hexdigest() + + def compute_unique_identifier_hash(doc: ConnectorDocument) -> str: """Return a stable SHA-256 hash identifying a document by its source identity.""" - combined = f"{doc.document_type.value}:{doc.unique_id}:{doc.search_space_id}" - return hashlib.sha256(combined.encode("utf-8")).hexdigest() + return compute_identifier_hash(doc.document_type.value, doc.unique_id, doc.search_space_id) def compute_content_hash(doc: ConnectorDocument) -> str: diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 490aac782..c6a29f204 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -6,12 +6,13 @@ from sqlalchemy import delete, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession -from app.db import Chunk, Document, DocumentStatus +from app.db import NATIVE_TO_LEGACY_DOCTYPE, Chunk, Document, DocumentStatus from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_chunker import chunk_text from app.indexing_pipeline.document_embedder import embed_texts from app.indexing_pipeline.document_hashing import ( compute_content_hash, + compute_identifier_hash, compute_unique_identifier_hash, ) from app.indexing_pipeline.document_persistence import ( @@ -54,6 +55,62 @@ class IndexingPipelineService: def __init__(self, session: AsyncSession) -> None: self.session = session + async def migrate_legacy_docs( + self, connector_docs: list[ConnectorDocument] + ) -> None: + """Migrate legacy Composio documents to their native Google type. + + For each ConnectorDocument whose document_type has a Composio equivalent + in NATIVE_TO_LEGACY_DOCTYPE, look up the old document by legacy hash and + update its unique_identifier_hash and document_type so that + prepare_for_indexing() can find it under the native hash. + """ + for doc in connector_docs: + legacy_type = NATIVE_TO_LEGACY_DOCTYPE.get(doc.document_type.value) + if not legacy_type: + continue + + legacy_hash = compute_identifier_hash( + legacy_type, doc.unique_id, doc.search_space_id + ) + result = await self.session.execute( + select(Document).filter( + Document.unique_identifier_hash == legacy_hash + ) + ) + existing = result.scalars().first() + if existing is None: + continue + + native_hash = compute_identifier_hash( + doc.document_type.value, doc.unique_id, doc.search_space_id + ) + existing.unique_identifier_hash = native_hash + existing.document_type = doc.document_type + + await self.session.commit() + + async def index_batch( + self, connector_docs: list[ConnectorDocument], llm + ) -> list[Document]: + """Convenience method: prepare_for_indexing then index each document. + + Indexers that need heartbeat callbacks or custom per-document logic + should call prepare_for_indexing() + index() directly instead. + """ + doc_map = { + compute_unique_identifier_hash(cd): cd for cd in connector_docs + } + documents = await self.prepare_for_indexing(connector_docs) + results: list[Document] = [] + for document in documents: + connector_doc = doc_map.get(document.unique_identifier_hash) + if connector_doc is None: + continue + result = await self.index(document, connector_doc, llm) + results.append(result) + return results + async def prepare_for_indexing( self, connector_docs: list[ConnectorDocument] ) -> list[Document]: diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 233bc66e4..a69b33bdc 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -1,9 +1,8 @@ """ Google Calendar connector indexer. -Implements 2-phase document status updates for real-time UI feedback: -- Phase 1: Create all documents with 'pending' status (visible in UI immediately) -- Phase 2: Process each document: pending → processing → ready/failed +Uses the shared IndexingPipelineService for document deduplication, +summarization, chunking, and embedding. """ import time @@ -15,29 +14,25 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.google_calendar_connector import GoogleCalendarConnector -from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType +from app.db import DocumentType, SearchSourceConnectorType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import ( + compute_content_hash, + compute_unique_identifier_hash, +) +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) from app.utils.google_credentials import ( COMPOSIO_GOOGLE_CONNECTOR_TYPES, build_composio_credentials, ) from .base import ( - check_document_by_unique_identifier, check_duplicate_document_by_hash, get_connector_by_id, - get_current_timestamp, logger, parse_date_flexible, - safe_set_chunks, update_connector_last_indexed, ) @@ -46,13 +41,60 @@ ACCEPTED_CALENDAR_CONNECTOR_TYPES = { SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, } -# Type hint for heartbeat callback HeartbeatCallbackType = Callable[[int], Awaitable[None]] - -# Heartbeat interval in seconds HEARTBEAT_INTERVAL_SECONDS = 30 +def _build_connector_doc( + event: dict, + event_markdown: str, + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, +) -> ConnectorDocument: + """Map a raw Google Calendar API event dict to a ConnectorDocument.""" + event_id = event.get("id", "") + event_summary = event.get("summary", "No Title") + calendar_id = event.get("calendarId", "") + + start = event.get("start", {}) + end = event.get("end", {}) + start_time = start.get("dateTime") or start.get("date", "") + end_time = end.get("dateTime") or end.get("date", "") + location = event.get("location", "") + + metadata = { + "event_id": event_id, + "event_summary": event_summary, + "calendar_id": calendar_id, + "start_time": start_time, + "end_time": end_time, + "location": location, + "connector_id": connector_id, + "document_type": "Google Calendar Event", + "connector_type": "Google Calendar", + } + + fallback_summary = ( + f"Google Calendar Event: {event_summary}\n\n{event_markdown}" + ) + + return ConnectorDocument( + title=event_summary, + source_markdown=event_markdown, + unique_id=event_id, + document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=enable_summary, + fallback_summary=fallback_summary, + metadata=metadata, + ) + + async def index_google_calendar_events( session: AsyncSession, connector_id: int, @@ -82,7 +124,6 @@ async def index_google_calendar_events( """ task_logger = TaskLoggingService(session, search_space_id) - # Log task start log_entry = await task_logger.log_task_start( task_name="google_calendar_events_indexing", source="connector_indexing_task", @@ -96,7 +137,7 @@ async def index_google_calendar_events( ) try: - # Accept both native and Composio Calendar connectors + # ── Connector lookup ────────────────────────────────────────── connector = None for ct in ACCEPTED_CALENDAR_CONNECTOR_TYPES: connector = await get_connector_by_id(session, connector_id, ct) @@ -112,7 +153,7 @@ async def index_google_calendar_events( ) return 0, 0, f"Connector with ID {connector_id} not found" - # Build credentials based on connector type + # ── Credential building ─────────────────────────────────────── if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: connected_account_id = connector.config.get("composio_connected_account_id") if not connected_account_id: @@ -184,6 +225,7 @@ async def index_google_calendar_events( ) return 0, 0, "Google Calendar credentials not found in connector config" + # ── Calendar client init ────────────────────────────────────── await task_logger.log_task_progress( log_entry, f"Initializing Google Calendar client for connector {connector_id}", @@ -203,36 +245,26 @@ async def index_google_calendar_events( if end_date == "undefined" or end_date == "": end_date = None - # Calculate date range - # For calendar connectors, allow future dates to index upcoming events + # ── Date range calculation ──────────────────────────────────── if start_date is None or end_date is None: - # Fall back to calculating dates based on last_indexed_at - # Default to today (users can manually select future dates if needed) calculated_end_date = datetime.now() - # Use last_indexed_at as start date if available, otherwise use 30 days ago if connector.last_indexed_at: - # Convert dates to be comparable (both timezone-naive) last_indexed_naive = ( connector.last_indexed_at.replace(tzinfo=None) if connector.last_indexed_at.tzinfo else connector.last_indexed_at ) - - # Allow future dates - use last_indexed_at as start date calculated_start_date = last_indexed_naive logger.info( f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date" ) else: - calculated_start_date = datetime.now() - timedelta( - days=365 - ) # Use 365 days as default for calendar events (matches frontend) + calculated_start_date = datetime.now() - timedelta(days=365) logger.info( f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (365 days ago) as start date" ) - # Use calculated dates if not provided start_date_str = ( start_date if start_date else calculated_start_date.strftime("%Y-%m-%d") ) @@ -240,19 +272,14 @@ async def index_google_calendar_events( end_date if end_date else calculated_end_date.strftime("%Y-%m-%d") ) else: - # Use provided dates (including future dates) start_date_str = start_date end_date_str = end_date - # FIX: Ensure end_date is at least 1 day after start_date to avoid - # "start_date must be strictly before end_date" errors when dates are the same - # (e.g., when last_indexed_at is today) if start_date_str == end_date_str: logger.info( f"Start date ({start_date_str}) equals end date ({end_date_str}), " "adjusting end date to next day to ensure valid date range" ) - # Parse end_date and add 1 day try: end_dt = parse_date_flexible(end_date_str) except ValueError: @@ -264,6 +291,7 @@ async def index_google_calendar_events( end_date_str = end_dt.strftime("%Y-%m-%d") logger.info(f"Adjusted end date to {end_date_str}") + # ── Fetch events ────────────────────────────────────────────── await task_logger.log_task_progress( log_entry, f"Fetching Google Calendar events from {start_date_str} to {end_date_str}", @@ -274,27 +302,19 @@ async def index_google_calendar_events( }, ) - # Get events within date range from primary calendar try: events, error = await calendar_client.get_all_primary_calendar_events( start_date=start_date_str, end_date=end_date_str ) if error: - # Don't treat "No events found" as an error that should stop indexing if "No events found" in error: logger.info(f"No Google Calendar events found: {error}") - logger.info( - "No events found is not a critical error, continuing with update" - ) if update_last_indexed: await update_connector_last_indexed( session, connector, update_last_indexed ) await session.commit() - logger.info( - f"Updated last_indexed_at to {connector.last_indexed_at} despite no events found" - ) await task_logger.log_task_success( log_entry, @@ -304,7 +324,6 @@ async def index_google_calendar_events( return 0, 0, None else: logger.error(f"Failed to get Google Calendar events: {error}") - # Check if this is an authentication error that requires re-authentication error_message = error error_type = "APIError" if ( @@ -329,28 +348,15 @@ async def index_google_calendar_events( logger.error(f"Error fetching Google Calendar events: {e!s}", exc_info=True) return 0, 0, f"Error fetching Google Calendar events: {e!s}" - documents_indexed = 0 + # ── Build ConnectorDocuments ────────────────────────────────── + connector_docs: list[ConnectorDocument] = [] documents_skipped = 0 - documents_failed = 0 # Track events that failed processing - duplicate_content_count = ( - 0 # Track events skipped due to duplicate content_hash - ) - - # Heartbeat tracking - update notification periodically to prevent appearing stuck - last_heartbeat_time = time.time() - - # ======================================================================= - # PHASE 1: Analyze all events, create pending documents - # This makes ALL documents visible in the UI immediately with pending status - # ======================================================================= - events_to_process = [] # List of dicts with document and event data - new_documents_created = False + duplicate_content_count = 0 for event in events: try: event_id = event.get("id") event_summary = event.get("summary", "No Title") - calendar_id = event.get("calendarId", "") if not event_id: logger.warning(f"Skipping event with missing ID: {event_summary}") @@ -363,223 +369,73 @@ async def index_google_calendar_events( documents_skipped += 1 continue - start = event.get("start", {}) - end = event.get("end", {}) - start_time = start.get("dateTime") or start.get("date", "") - end_time = end.get("dateTime") or end.get("date", "") - location = event.get("location", "") - description = event.get("description", "") - - # Generate unique identifier hash for this Google Calendar event - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.GOOGLE_CALENDAR_CONNECTOR, event_id, search_space_id + doc = _build_connector_doc( + event, + event_markdown, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=connector.enable_summary, ) - # Generate content hash - content_hash = generate_content_hash(event_markdown, search_space_id) - - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Fallback: legacy Composio hash - if not existing_document: - legacy_hash = generate_unique_identifier_hash( - DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, - event_id, - search_space_id, - ) - existing_document = await check_document_by_unique_identifier( - session, legacy_hash - ) - if existing_document: - existing_document.unique_identifier_hash = ( - unique_identifier_hash - ) - if ( - existing_document.document_type - == DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR - ): - existing_document.document_type = ( - DocumentType.GOOGLE_CALENDAR_CONNECTOR - ) - logger.info( - f"Migrated legacy Composio Calendar document: {event_id}" - ) - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state( - existing_document.status, DocumentStatus.READY - ): - existing_document.status = DocumentStatus.ready() - documents_skipped += 1 - continue - - # Queue existing document for update (will be set to processing in Phase 2) - events_to_process.append( - { - "document": existing_document, - "is_new": False, - "event_markdown": event_markdown, - "content_hash": content_hash, - "event_id": event_id, - "event_summary": event_summary, - "calendar_id": calendar_id, - "start_time": start_time, - "end_time": end_time, - "location": location, - "description": description, - } - ) - continue - - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from another connector) with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash + duplicate = await check_duplicate_document_by_hash( + session, compute_content_hash(doc) ) - - if duplicate_by_content: - # A document with the same content already exists (likely from Composio connector) + if duplicate: logger.info( - f"Event {event_summary} already indexed by another connector " - f"(existing document ID: {duplicate_by_content.id}, " - f"type: {duplicate_by_content.document_type}). Skipping to avoid duplicate content." + f"Event {doc.title} already indexed by another connector " + f"(existing document ID: {duplicate.id}, " + f"type: {duplicate.document_type}). Skipping." ) duplicate_content_count += 1 documents_skipped += 1 continue - # Create new document with PENDING status (visible in UI immediately) - document = Document( - search_space_id=search_space_id, - title=event_summary, - document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR, - document_metadata={ - "event_id": event_id, - "event_summary": event_summary, - "calendar_id": calendar_id, - "start_time": start_time, - "end_time": end_time, - "location": location, - "connector_id": connector_id, - }, - content="Pending...", # Placeholder until processed - content_hash=unique_identifier_hash, # Temporary unique value - updated when ready - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], # Empty at creation - safe for async - status=DocumentStatus.pending(), # Pending until processing starts - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - new_documents_created = True - - events_to_process.append( - { - "document": document, - "is_new": True, - "event_markdown": event_markdown, - "content_hash": content_hash, - "event_id": event_id, - "event_summary": event_summary, - "calendar_id": calendar_id, - "start_time": start_time, - "end_time": end_time, - "location": location, - "description": description, - } - ) + connector_docs.append(doc) except Exception as e: - logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True) - documents_failed += 1 + logger.error(f"Error building ConnectorDocument for event: {e!s}", exc_info=True) + documents_skipped += 1 continue - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info( - f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents" - ) - await session.commit() + # ── Pipeline: migrate legacy docs + prepare + index ─────────── + pipeline = IndexingPipelineService(session) - # ======================================================================= - # PHASE 2: Process each document one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(events_to_process)} documents") + await pipeline.migrate_legacy_docs(connector_docs) - for item in events_to_process: - # Send heartbeat periodically + documents = await pipeline.prepare_for_indexing(connector_docs) + + doc_map = { + compute_unique_identifier_hash(cd): cd for cd in connector_docs + } + + documents_indexed = 0 + documents_failed = 0 + last_heartbeat_time = time.time() + + for document in documents: if on_heartbeat_callback: current_time = time.time() if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item["document"] - try: - # Set to PROCESSING and commit - shows "processing" in UI for THIS document only - document.status = DocumentStatus.processing() - await session.commit() + connector_doc = doc_map.get(document.unique_identifier_hash) + if connector_doc is None: + logger.warning( + f"No matching ConnectorDocument for document {document.id}, skipping" + ) + documents_failed += 1 + continue - # Heavy processing (LLM, embeddings, chunks) + try: user_llm = await get_user_long_context_llm( session, user_id, search_space_id ) - - if user_llm and connector.enable_summary: - document_metadata_for_summary = { - "event_id": item["event_id"], - "event_summary": item["event_summary"], - "calendar_id": item["calendar_id"], - "start_time": item["start_time"], - "end_time": item["end_time"], - "location": item["location"] or "No location", - "document_type": "Google Calendar Event", - "connector_type": "Google Calendar", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - item["event_markdown"], user_llm, document_metadata_for_summary - ) - else: - summary_content = f"Google Calendar Event: {item['event_summary']}\n\n{item['event_markdown']}" - summary_embedding = embed_text(summary_content) - - chunks = await create_document_chunks(item["event_markdown"]) - - # Update document to READY with actual content - document.title = item["event_summary"] - document.content = summary_content - document.content_hash = item["content_hash"] - document.embedding = summary_embedding - document.document_metadata = { - "event_id": item["event_id"], - "event_summary": item["event_summary"], - "calendar_id": item["calendar_id"], - "start_time": item["start_time"], - "end_time": item["end_time"], - "location": item["location"], - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "connector_id": connector_id, - } - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - + await pipeline.index(document, connector_doc, user_llm) documents_indexed += 1 - # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Google Calendar events processed so far" @@ -588,21 +444,12 @@ async def index_google_calendar_events( except Exception as e: logger.error(f"Error processing Calendar event: {e!s}", exc_info=True) - # Mark document as failed with reason (visible in UI) - try: - document.status = DocumentStatus.failed(str(e)) - document.updated_at = get_current_timestamp() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) documents_failed += 1 continue - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) - # Final commit for any remaining documents not yet committed in batches logger.info( f"Final commit: Total {documents_indexed} Google Calendar events processed" ) @@ -612,22 +459,18 @@ async def index_google_calendar_events( "Successfully committed all Google Calendar document changes to database" ) except Exception as e: - # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower() ): logger.warning( f"Duplicate content_hash detected during final commit. " - f"This may occur if the same event was indexed by multiple connectors. " f"Rolling back and continuing. Error: {e!s}" ) await session.rollback() - # Don't fail the entire task - some documents may have been successfully indexed else: raise - # Build warning message if there were issues warning_parts = [] if duplicate_content_count > 0: warning_parts.append(f"{duplicate_content_count} duplicate") diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 260db0ce6..92c074812 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -1,36 +1,41 @@ -"""Google Drive indexer using Surfsense file processors. +"""Google Drive indexer using the shared IndexingPipelineService. -Implements 2-phase document status updates for real-time UI feedback: -- Phase 1: Create all documents with 'pending' status (visible in UI immediately) -- Phase 2: Process each document: pending → processing → ready/failed +File-level pre-filter (_should_skip_file) handles md5/modifiedTime +checks and rename-only detection. download_and_extract_content() +returns markdown which is fed into ConnectorDocument -> pipeline. """ import logging import time from collections.abc import Awaitable, Callable +from sqlalchemy import String, cast, select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm.attributes import flag_modified from app.config import config from app.connectors.google_drive import ( GoogleDriveClient, categorize_change, - download_and_process_file, + download_and_extract_content, fetch_all_changes, get_file_by_id, get_files_in_folder, get_start_page_token, ) +from app.connectors.google_drive.file_types import should_skip_file as skip_mime from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService +from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, get_connector_by_id, - get_current_timestamp, update_connector_last_indexed, ) -from app.utils.document_converters import generate_unique_identifier_hash from app.utils.google_credentials import ( COMPOSIO_GOOGLE_CONNECTOR_TYPES, build_composio_credentials, @@ -41,15 +46,423 @@ ACCEPTED_DRIVE_CONNECTOR_TYPES = { SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, } -# Type hint for heartbeat callback HeartbeatCallbackType = Callable[[int], Awaitable[None]] - -# Heartbeat interval in seconds HEARTBEAT_INTERVAL_SECONDS = 30 logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +async def _should_skip_file( + session: AsyncSession, + file: dict, + search_space_id: int, +) -> tuple[bool, str | None]: + """Pre-filter: detect unchanged / rename-only files. + + Returns (should_skip, message). + Side-effects: migrates legacy Composio hashes, updates renames in-place. + """ + file_id = file.get("id") + file_name = file.get("name", "Unknown") + mime_type = file.get("mimeType", "") + + if skip_mime(mime_type): + return True, "folder/shortcut" + if not file_id: + return True, "missing file_id" + + # --- locate existing document --- + primary_hash = compute_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE.value, file_id, search_space_id + ) + existing = await check_document_by_unique_identifier(session, primary_hash) + + if not existing: + legacy_hash = compute_identifier_hash( + DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR.value, file_id, search_space_id + ) + existing = await check_document_by_unique_identifier(session, legacy_hash) + if existing: + existing.unique_identifier_hash = primary_hash + if existing.document_type == DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: + existing.document_type = DocumentType.GOOGLE_DRIVE_FILE + logger.info(f"Migrated legacy Composio Drive document: {file_id}") + + if not existing: + result = await session.execute( + select(Document).where( + Document.search_space_id == search_space_id, + Document.document_type.in_([ + DocumentType.GOOGLE_DRIVE_FILE, + DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ]), + cast(Document.document_metadata["google_drive_file_id"], String) == file_id, + ) + ) + existing = result.scalar_one_or_none() + if existing: + existing.unique_identifier_hash = primary_hash + if existing.document_type == DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: + existing.document_type = DocumentType.GOOGLE_DRIVE_FILE + logger.debug(f"Found legacy doc by metadata for file_id: {file_id}") + + if not existing: + return False, None + + # --- content-change check via md5 / modifiedTime --- + incoming_md5 = file.get("md5Checksum") + incoming_mtime = file.get("modifiedTime") + meta = existing.document_metadata or {} + stored_md5 = meta.get("md5_checksum") + stored_mtime = meta.get("modified_time") + + content_unchanged = False + if incoming_md5 and stored_md5: + content_unchanged = incoming_md5 == stored_md5 + elif incoming_md5 and not stored_md5: + return False, None + elif not incoming_md5 and incoming_mtime and stored_mtime: + content_unchanged = incoming_mtime == stored_mtime + elif not incoming_md5: + return False, None + + if not content_unchanged: + return False, None + + # --- rename-only detection --- + old_name = meta.get("FILE_NAME") or meta.get("google_drive_file_name") + if old_name and old_name != file_name: + existing.title = file_name + if not existing.document_metadata: + existing.document_metadata = {} + existing.document_metadata["FILE_NAME"] = file_name + existing.document_metadata["google_drive_file_name"] = file_name + if incoming_mtime: + existing.document_metadata["modified_time"] = incoming_mtime + flag_modified(existing, "document_metadata") + await session.commit() + logger.info(f"Rename-only update: '{old_name}' → '{file_name}'") + return True, f"File renamed: '{old_name}' → '{file_name}'" + + if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): + existing.status = DocumentStatus.ready() + return True, "unchanged" + + +def _build_connector_doc( + file: dict, + markdown: str, + drive_metadata: dict, + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, +) -> ConnectorDocument: + """Build a ConnectorDocument from Drive file metadata + extracted markdown.""" + file_id = file.get("id", "") + file_name = file.get("name", "Unknown") + + metadata = { + **drive_metadata, + "connector_id": connector_id, + "document_type": "Google Drive File", + "connector_type": "Google Drive", + } + + fallback_summary = f"File: {file_name}\n\n{markdown[:4000]}" + + return ConnectorDocument( + title=file_name, + source_markdown=markdown, + unique_id=file_id, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=enable_summary, + fallback_summary=fallback_summary, + metadata=metadata, + ) + + +async def _process_single_file( + drive_client: GoogleDriveClient, + session: AsyncSession, + file: dict, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool = True, +) -> tuple[int, int, int]: + """Download, extract, and index a single Drive file via the pipeline. + + Returns (indexed, skipped, failed). + """ + file_name = file.get("name", "Unknown") + + try: + skip, msg = await _should_skip_file(session, file, search_space_id) + if skip: + if msg and "renamed" in msg.lower(): + return 1, 0, 0 + return 0, 1, 0 + + markdown, drive_metadata, error = await download_and_extract_content( + drive_client, file + ) + if error or not markdown: + logger.warning(f"ETL failed for {file_name}: {error}") + return 0, 1, 0 + + doc = _build_connector_doc( + file, + markdown, + drive_metadata, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=enable_summary, + ) + + pipeline = IndexingPipelineService(session) + documents = await pipeline.prepare_for_indexing([doc]) + if not documents: + return 0, 1, 0 + + from app.indexing_pipeline.document_hashing import compute_unique_identifier_hash + + doc_map = {compute_unique_identifier_hash(doc): doc} + for document in documents: + connector_doc = doc_map.get(document.unique_identifier_hash) + if not connector_doc: + continue + user_llm = await get_user_long_context_llm(session, user_id, search_space_id) + await pipeline.index(document, connector_doc, user_llm) + + logger.info(f"Successfully indexed Google Drive file: {file_name}") + return 1, 0, 0 + + except Exception as e: + logger.error(f"Error processing file {file_name}: {e!s}", exc_info=True) + return 0, 0, 1 + + +async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int): + """Remove a document that was deleted in Drive.""" + primary_hash = compute_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE.value, file_id, search_space_id + ) + existing = await check_document_by_unique_identifier(session, primary_hash) + + if not existing: + legacy_hash = compute_identifier_hash( + DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR.value, file_id, search_space_id + ) + existing = await check_document_by_unique_identifier(session, legacy_hash) + + if not existing: + result = await session.execute( + select(Document).where( + Document.search_space_id == search_space_id, + Document.document_type.in_([ + DocumentType.GOOGLE_DRIVE_FILE, + DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ]), + cast(Document.document_metadata["google_drive_file_id"], String) == file_id, + ) + ) + existing = result.scalar_one_or_none() + + if existing: + await session.delete(existing) + logger.info(f"Removed deleted file document: {file_id}") + + +# --------------------------------------------------------------------------- +# Scan strategies +# --------------------------------------------------------------------------- + +async def _index_full_scan( + drive_client: GoogleDriveClient, + session: AsyncSession, + connector: object, + connector_id: int, + search_space_id: int, + user_id: str, + folder_id: str | None, + folder_name: str, + task_logger: TaskLoggingService, + log_entry: object, + max_files: int, + include_subfolders: bool = False, + on_heartbeat_callback: HeartbeatCallbackType | None = None, + enable_summary: bool = True, +) -> tuple[int, int]: + """Full scan indexing of a folder.""" + await task_logger.log_task_progress( + log_entry, + f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})", + {"stage": "full_scan", "folder_id": folder_id, "include_subfolders": include_subfolders}, + ) + + indexed = 0 + skipped = 0 + failed = 0 + files_processed = 0 + last_heartbeat = time.time() + folders_to_process = [(folder_id, folder_name)] + first_error: str | None = None + + while folders_to_process and files_processed < max_files: + cur_id, cur_name = folders_to_process.pop(0) + page_token = None + + while files_processed < max_files: + files, next_token, error = await get_files_in_folder( + drive_client, cur_id, include_subfolders=True, page_token=page_token, + ) + if error: + logger.error(f"Error listing files in {cur_name}: {error}") + if first_error is None: + first_error = error + break + if not files: + break + + for file in files: + if files_processed >= max_files: + break + + mime = file.get("mimeType", "") + if mime == "application/vnd.google-apps.folder": + if include_subfolders: + folders_to_process.append((file["id"], file.get("name", "Unknown"))) + continue + + files_processed += 1 + + if on_heartbeat_callback: + now = time.time() + if now - last_heartbeat >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(indexed) + last_heartbeat = now + + i, s, f = await _process_single_file( + drive_client, session, file, + connector_id, search_space_id, user_id, enable_summary, + ) + indexed += i + skipped += s + failed += f + + if indexed > 0 and indexed % 10 == 0: + await session.commit() + + page_token = next_token + if not page_token: + break + + if not files_processed and first_error: + err_lower = first_error.lower() + if "401" in first_error or "invalid credentials" in err_lower or "authError" in first_error: + raise Exception( + f"Google Drive authentication failed. Please re-authenticate. (Error: {first_error})" + ) + raise Exception(f"Failed to list Google Drive files: {first_error}") + + logger.info(f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed") + return indexed, skipped + + +async def _index_with_delta_sync( + drive_client: GoogleDriveClient, + session: AsyncSession, + connector: object, + connector_id: int, + search_space_id: int, + user_id: str, + folder_id: str | None, + start_page_token: str, + task_logger: TaskLoggingService, + log_entry: object, + max_files: int, + include_subfolders: bool = False, + on_heartbeat_callback: HeartbeatCallbackType | None = None, + enable_summary: bool = True, +) -> tuple[int, int]: + """Delta sync using change tracking.""" + await task_logger.log_task_progress( + log_entry, + f"Starting delta sync from token: {start_page_token[:20]}...", + {"stage": "delta_sync", "start_token": start_page_token}, + ) + + changes, _final_token, error = await fetch_all_changes(drive_client, start_page_token, folder_id) + if error: + err_lower = error.lower() + if "401" in error or "invalid credentials" in err_lower or "authError" in error: + raise Exception( + f"Google Drive authentication failed. Please re-authenticate. (Error: {error})" + ) + raise Exception(f"Failed to fetch Google Drive changes: {error}") + + if not changes: + logger.info("No changes detected since last sync") + return 0, 0 + + logger.info(f"Processing {len(changes)} changes") + indexed = 0 + skipped = 0 + failed = 0 + files_processed = 0 + last_heartbeat = time.time() + + for change in changes: + if files_processed >= max_files: + break + files_processed += 1 + change_type = categorize_change(change) + + if change_type in ["removed", "trashed"]: + fid = change.get("fileId") + if fid: + await _remove_document(session, fid, search_space_id) + continue + + file = change.get("file") + if not file: + continue + + if on_heartbeat_callback: + now = time.time() + if now - last_heartbeat >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(indexed) + last_heartbeat = now + + i, s, f = await _process_single_file( + drive_client, session, file, + connector_id, search_space_id, user_id, enable_summary, + ) + indexed += i + skipped += s + failed += f + + if indexed > 0 and indexed % 10 == 0: + await session.commit() + + logger.info(f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed") + return indexed, skipped + + +# --------------------------------------------------------------------------- +# Public entry points +# --------------------------------------------------------------------------- + async def index_google_drive_files( session: AsyncSession, connector_id: int, @@ -63,234 +476,125 @@ async def index_google_drive_files( include_subfolders: bool = False, on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int, str | None]: - """ - Index Google Drive files for a specific connector. - - Args: - session: Database session - connector_id: ID of the Drive connector - search_space_id: ID of the search space - user_id: ID of the user - folder_id: Specific folder to index (from UI/request, takes precedence) - folder_name: Folder name for display (from UI/request) - use_delta_sync: Whether to use change tracking for incremental sync - update_last_indexed: Whether to update last_indexed_at timestamp - max_files: Maximum number of files to index - include_subfolders: Whether to recursively index files in subfolders - on_heartbeat_callback: Optional callback to update notification during long-running indexing. - - Returns: - Tuple of (number_of_indexed_files, number_of_skipped_files, error_message) - """ + """Index Google Drive files for a specific connector.""" task_logger = TaskLoggingService(session, search_space_id) - log_entry = await task_logger.log_task_start( task_name="google_drive_files_indexing", source="connector_indexing_task", message=f"Starting Google Drive indexing for connector {connector_id}", metadata={ - "connector_id": connector_id, - "user_id": str(user_id), - "folder_id": folder_id, - "use_delta_sync": use_delta_sync, - "max_files": max_files, + "connector_id": connector_id, "user_id": str(user_id), + "folder_id": folder_id, "use_delta_sync": use_delta_sync, "max_files": max_files, }, ) try: - # Accept both native and Composio Drive connectors connector = None for ct in ACCEPTED_DRIVE_CONNECTOR_TYPES: connector = await get_connector_by_id(session, connector_id, ct) if connector: break - if not connector: error_msg = f"Google Drive connector with ID {connector_id} not found" - await task_logger.log_task_failure( - log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} - ) + await task_logger.log_task_failure(log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}) return 0, 0, error_msg await task_logger.log_task_progress( - log_entry, - f"Initializing Google Drive client for connector {connector_id}", + log_entry, f"Initializing Google Drive client for connector {connector_id}", {"stage": "client_initialization"}, ) - # Build credentials based on connector type pre_built_credentials = None if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: connected_account_id = connector.config.get("composio_connected_account_id") if not connected_account_id: error_msg = f"Composio connected_account_id not found for connector {connector_id}" - await task_logger.log_task_failure( - log_entry, - error_msg, - "Missing Composio account", - {"error_type": "MissingComposioAccount"}, - ) + await task_logger.log_task_failure(log_entry, error_msg, "Missing Composio account", {"error_type": "MissingComposioAccount"}) return 0, 0, error_msg pre_built_credentials = build_composio_credentials(connected_account_id) else: token_encrypted = connector.config.get("_token_encrypted", False) - if token_encrypted: - if not config.SECRET_KEY: - await task_logger.log_task_failure( - log_entry, - f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}", - "Missing SECRET_KEY for token decryption", - {"error_type": "MissingSecretKey"}, - ) - return ( - 0, - 0, - "SECRET_KEY not configured but credentials are marked as encrypted", - ) - logger.info( - f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization" + if token_encrypted and not config.SECRET_KEY: + await task_logger.log_task_failure( + log_entry, "SECRET_KEY not configured but credentials are encrypted", + "Missing SECRET_KEY", {"error_type": "MissingSecretKey"}, ) + return 0, 0, "SECRET_KEY not configured but credentials are marked as encrypted" connector_enable_summary = getattr(connector, "enable_summary", True) - - drive_client = GoogleDriveClient( - session, connector_id, credentials=pre_built_credentials - ) + drive_client = GoogleDriveClient(session, connector_id, credentials=pre_built_credentials) if not folder_id: error_msg = "folder_id is required for Google Drive indexing" - await task_logger.log_task_failure( - log_entry, error_msg, {"error_type": "MissingParameter"} - ) + await task_logger.log_task_failure(log_entry, error_msg, {"error_type": "MissingParameter"}) return 0, 0, error_msg target_folder_id = folder_id target_folder_name = folder_name or "Selected Folder" - logger.info( - f"Indexing Google Drive folder: {target_folder_name} ({target_folder_id})" - ) - folder_tokens = connector.config.get("folder_tokens", {}) start_page_token = folder_tokens.get(target_folder_id) - can_use_delta_sync = ( - use_delta_sync and start_page_token and connector.last_indexed_at - ) + can_use_delta = use_delta_sync and start_page_token and connector.last_indexed_at - if can_use_delta_sync: + if can_use_delta: logger.info(f"Using delta sync for connector {connector_id}") - result = await _index_with_delta_sync( - drive_client=drive_client, - session=session, - connector=connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - folder_id=target_folder_id, - start_page_token=start_page_token, - task_logger=task_logger, - log_entry=log_entry, - max_files=max_files, - include_subfolders=include_subfolders, - on_heartbeat_callback=on_heartbeat_callback, - enable_summary=connector_enable_summary, + documents_indexed, documents_skipped = await _index_with_delta_sync( + drive_client, session, connector, connector_id, search_space_id, user_id, + target_folder_id, start_page_token, task_logger, log_entry, max_files, + include_subfolders, on_heartbeat_callback, connector_enable_summary, ) - documents_indexed, documents_skipped = result - - # Reconciliation: full scan re-indexes documents that were manually - # deleted from SurfSense but still exist in Google Drive. - # Already-indexed files are skipped via md5/modifiedTime checks, - # so the overhead is just one API listing call + fast DB lookups. logger.info("Running reconciliation scan after delta sync") - reconcile_result = await _index_full_scan( - drive_client=drive_client, - session=session, - connector=connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - folder_id=target_folder_id, - folder_name=target_folder_name, - task_logger=task_logger, - log_entry=log_entry, - max_files=max_files, - include_subfolders=include_subfolders, - on_heartbeat_callback=on_heartbeat_callback, - enable_summary=connector_enable_summary, + ri, rs = await _index_full_scan( + drive_client, session, connector, connector_id, search_space_id, user_id, + target_folder_id, target_folder_name, task_logger, log_entry, max_files, + include_subfolders, on_heartbeat_callback, connector_enable_summary, ) - documents_indexed += reconcile_result[0] - documents_skipped += reconcile_result[1] + documents_indexed += ri + documents_skipped += rs else: logger.info(f"Using full scan for connector {connector_id}") - result = await _index_full_scan( - drive_client=drive_client, - session=session, - connector=connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - folder_id=target_folder_id, - folder_name=target_folder_name, - task_logger=task_logger, - log_entry=log_entry, - max_files=max_files, - include_subfolders=include_subfolders, - on_heartbeat_callback=on_heartbeat_callback, - enable_summary=connector_enable_summary, + documents_indexed, documents_skipped = await _index_full_scan( + drive_client, session, connector, connector_id, search_space_id, user_id, + target_folder_id, target_folder_name, task_logger, log_entry, max_files, + include_subfolders, on_heartbeat_callback, connector_enable_summary, ) - documents_indexed, documents_skipped = result - if documents_indexed > 0 or can_use_delta_sync: + if documents_indexed > 0 or can_use_delta: new_token, token_error = await get_start_page_token(drive_client) if new_token and not token_error: - from sqlalchemy.orm.attributes import flag_modified - - # Refresh connector to reload attributes that may have been expired by earlier commits await session.refresh(connector) - if "folder_tokens" not in connector.config: connector.config["folder_tokens"] = {} connector.config["folder_tokens"][target_folder_id] = new_token flag_modified(connector, "config") - await update_connector_last_indexed(session, connector, update_last_indexed) await session.commit() - logger.info("Successfully committed Google Drive indexing changes to database") await task_logger.log_task_success( log_entry, f"Successfully completed Google Drive indexing for connector {connector_id}", { - "files_processed": documents_indexed, - "files_skipped": documents_skipped, - "sync_type": "delta" if can_use_delta_sync else "full", - "folder": target_folder_name, + "files_processed": documents_indexed, "files_skipped": documents_skipped, + "sync_type": "delta" if can_use_delta else "full", "folder": target_folder_name, }, ) - - logger.info( - f"Google Drive indexing completed: {documents_indexed} files indexed, {documents_skipped} skipped" - ) + logger.info(f"Google Drive indexing completed: {documents_indexed} indexed, {documents_skipped} skipped") return documents_indexed, documents_skipped, None except SQLAlchemyError as db_error: await session.rollback() await task_logger.log_task_failure( - log_entry, - f"Database error during Google Drive indexing for connector {connector_id}", - str(db_error), - {"error_type": "SQLAlchemyError"}, + log_entry, f"Database error during Google Drive indexing for connector {connector_id}", + str(db_error), {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() await task_logger.log_task_failure( - log_entry, - f"Failed to index Google Drive files for connector {connector_id}", - str(e), - {"error_type": type(e).__name__}, + log_entry, f"Failed to index Google Drive files for connector {connector_id}", + str(e), {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Google Drive files: {e!s}", exc_info=True) return 0, 0, f"Failed to index Google Drive files: {e!s}" @@ -304,964 +608,81 @@ async def index_google_drive_single_file( file_id: str, file_name: str | None = None, ) -> tuple[int, str | None]: - """ - Index a single Google Drive file by its ID. - - Args: - session: Database session - connector_id: ID of the Drive connector - search_space_id: ID of the search space - user_id: ID of the user - file_id: Specific file ID to index - file_name: File name for display (optional) - - Returns: - Tuple of (number_of_indexed_files, error_message) - """ + """Index a single Google Drive file by its ID.""" task_logger = TaskLoggingService(session, search_space_id) - log_entry = await task_logger.log_task_start( task_name="google_drive_single_file_indexing", source="connector_indexing_task", message=f"Starting Google Drive single file indexing for file {file_id}", - metadata={ - "connector_id": connector_id, - "user_id": str(user_id), - "file_id": file_id, - "file_name": file_name, - }, + metadata={"connector_id": connector_id, "user_id": str(user_id), "file_id": file_id, "file_name": file_name}, ) try: - # Accept both native and Composio Drive connectors connector = None for ct in ACCEPTED_DRIVE_CONNECTOR_TYPES: connector = await get_connector_by_id(session, connector_id, ct) if connector: break - if not connector: error_msg = f"Google Drive connector with ID {connector_id} not found" - await task_logger.log_task_failure( - log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} - ) + await task_logger.log_task_failure(log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}) return 0, error_msg - await task_logger.log_task_progress( - log_entry, - f"Initializing Google Drive client for connector {connector_id}", - {"stage": "client_initialization"}, - ) - pre_built_credentials = None if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: connected_account_id = connector.config.get("composio_connected_account_id") if not connected_account_id: error_msg = f"Composio connected_account_id not found for connector {connector_id}" - await task_logger.log_task_failure( - log_entry, - error_msg, - "Missing Composio account", - {"error_type": "MissingComposioAccount"}, - ) + await task_logger.log_task_failure(log_entry, error_msg, "Missing Composio account", {"error_type": "MissingComposioAccount"}) return 0, error_msg pre_built_credentials = build_composio_credentials(connected_account_id) else: token_encrypted = connector.config.get("_token_encrypted", False) - if token_encrypted: - if not config.SECRET_KEY: - await task_logger.log_task_failure( - log_entry, - f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}", - "Missing SECRET_KEY for token decryption", - {"error_type": "MissingSecretKey"}, - ) - return ( - 0, - "SECRET_KEY not configured but credentials are marked as encrypted", - ) - logger.info( - f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization" + if token_encrypted and not config.SECRET_KEY: + await task_logger.log_task_failure( + log_entry, "SECRET_KEY not configured but credentials are encrypted", + "Missing SECRET_KEY", {"error_type": "MissingSecretKey"}, ) + return 0, "SECRET_KEY not configured but credentials are marked as encrypted" connector_enable_summary = getattr(connector, "enable_summary", True) + drive_client = GoogleDriveClient(session, connector_id, credentials=pre_built_credentials) - drive_client = GoogleDriveClient( - session, connector_id, credentials=pre_built_credentials - ) - - # Fetch the file metadata file, error = await get_file_by_id(drive_client, file_id) - if error or not file: error_msg = f"Failed to fetch file {file_id}: {error or 'File not found'}" - await task_logger.log_task_failure( - log_entry, error_msg, {"error_type": "FileNotFound"} - ) + await task_logger.log_task_failure(log_entry, error_msg, {"error_type": "FileNotFound"}) return 0, error_msg display_name = file_name or file.get("name", "Unknown") - logger.info(f"Indexing Google Drive file: {display_name} ({file_id})") - # Create pending document for status visibility - pending_doc, should_skip = await _create_pending_document_for_file( - session=session, - file=file, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - - if should_skip: - await task_logger.log_task_progress( - log_entry, - f"File {display_name} is unchanged or not indexable", - {"status": "skipped"}, - ) - return 0, None - - # Commit pending document so it appears in UI - if pending_doc and pending_doc.id is None: - await session.commit() - - # Process the file indexed, _skipped, failed = await _process_single_file( - drive_client=drive_client, - session=session, - file=file, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, - pending_document=pending_doc, - enable_summary=connector_enable_summary, + drive_client, session, file, + connector_id, search_space_id, user_id, connector_enable_summary, ) - await session.commit() - logger.info( - "Successfully committed Google Drive file indexing changes to database" - ) if failed > 0: error_msg = f"Failed to index file {display_name}" - await task_logger.log_task_failure( - log_entry, - error_msg, - {"file_name": display_name, "file_id": file_id}, - ) + await task_logger.log_task_failure(log_entry, error_msg, {"file_name": display_name, "file_id": file_id}) return 0, error_msg if indexed > 0: await task_logger.log_task_success( - log_entry, - f"Successfully indexed file {display_name}", - { - "file_name": display_name, - "file_id": file_id, - }, + log_entry, f"Successfully indexed file {display_name}", + {"file_name": display_name, "file_id": file_id}, ) - logger.info(f"Google Drive file indexing completed: {display_name}") return 1, None - else: - await task_logger.log_task_progress( - log_entry, - f"File {display_name} was skipped", - {"status": "skipped"}, - ) - return 0, None + + return 0, None except SQLAlchemyError as db_error: await session.rollback() - await task_logger.log_task_failure( - log_entry, - "Database error during file indexing", - str(db_error), - {"error_type": "SQLAlchemyError"}, - ) + await task_logger.log_task_failure(log_entry, "Database error during file indexing", str(db_error), {"error_type": "SQLAlchemyError"}) logger.error(f"Database error: {db_error!s}", exc_info=True) return 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() - await task_logger.log_task_failure( - log_entry, - "Failed to index Google Drive file", - str(e), - {"error_type": type(e).__name__}, - ) + await task_logger.log_task_failure(log_entry, "Failed to index Google Drive file", str(e), {"error_type": type(e).__name__}) logger.error(f"Failed to index Google Drive file: {e!s}", exc_info=True) return 0, f"Failed to index Google Drive file: {e!s}" - - -async def _index_full_scan( - drive_client: GoogleDriveClient, - session: AsyncSession, - connector: any, - connector_id: int, - search_space_id: int, - user_id: str, - folder_id: str | None, - folder_name: str, - task_logger: TaskLoggingService, - log_entry: any, - max_files: int, - include_subfolders: bool = False, - on_heartbeat_callback: HeartbeatCallbackType | None = None, - enable_summary: bool = True, -) -> tuple[int, int]: - """Perform full scan indexing of a folder. - - Implements 2-phase document status updates for real-time UI feedback: - - Phase 1: Collect all files and create pending documents (visible in UI immediately) - - Phase 2: Process each file: pending → processing → ready/failed - """ - await task_logger.log_task_progress( - log_entry, - f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})", - { - "stage": "full_scan", - "folder_id": folder_id, - "include_subfolders": include_subfolders, - }, - ) - - documents_indexed = 0 - documents_skipped = 0 - documents_failed = 0 - files_processed = 0 - - # Heartbeat tracking - update notification periodically to prevent appearing stuck - last_heartbeat_time = time.time() - - # ======================================================================= - # PHASE 1: Collect all files and create pending documents - # This makes ALL documents visible in the UI immediately with pending status - # ======================================================================= - files_to_process = [] # List of (file, pending_document or None) - new_documents_created = False - - # Queue of folders to process: (folder_id, folder_name) - folders_to_process = [(folder_id, folder_name)] - first_listing_error: str | None = None - - logger.info("Phase 1: Collecting files and creating pending documents") - - while folders_to_process and files_processed < max_files: - current_folder_id, current_folder_name = folders_to_process.pop(0) - logger.info(f"Scanning folder: {current_folder_name} ({current_folder_id})") - page_token = None - - while files_processed < max_files: - # Get files and folders in current folder - files, next_token, error = await get_files_in_folder( - drive_client, - current_folder_id, - include_subfolders=True, - page_token=page_token, - ) - - if error: - logger.error(f"Error listing files in {current_folder_name}: {error}") - if first_listing_error is None: - first_listing_error = error - break - - if not files: - break - - for file in files: - if files_processed >= max_files: - break - - mime_type = file.get("mimeType", "") - - # If this is a folder and include_subfolders is enabled, queue it for processing - if mime_type == "application/vnd.google-apps.folder": - if include_subfolders: - folders_to_process.append( - (file["id"], file.get("name", "Unknown")) - ) - logger.debug(f"Queued subfolder: {file.get('name', 'Unknown')}") - continue - - files_processed += 1 - - # Create pending document for this file - pending_doc, should_skip = await _create_pending_document_for_file( - session=session, - file=file, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - - if should_skip: - documents_skipped += 1 - continue - - if pending_doc and pending_doc.id is None: - # New document was created - new_documents_created = True - - files_to_process.append((file, pending_doc)) - - page_token = next_token - if not page_token: - break - - if not files_to_process and first_listing_error: - error_lower = first_listing_error.lower() - if ( - "401" in first_listing_error - or "invalid credentials" in error_lower - or "authError" in first_listing_error - ): - raise Exception( - f"Google Drive authentication failed. Please re-authenticate. " - f"(Error: {first_listing_error})" - ) - raise Exception(f"Failed to list Google Drive files: {first_listing_error}") - - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info( - f"Phase 1: Committing {len([f for f in files_to_process if f[1] and f[1].id is None])} pending documents" - ) - await session.commit() - - # ======================================================================= - # PHASE 2: Process each file one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(files_to_process)} files") - - for file, pending_doc in files_to_process: - # Check if it's time for a heartbeat update - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time - - indexed, skipped, failed = await _process_single_file( - drive_client=drive_client, - session=session, - file=file, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, - pending_document=pending_doc, - enable_summary=enable_summary, - ) - - documents_indexed += indexed - documents_skipped += skipped - documents_failed += failed - - if documents_indexed % 10 == 0 and documents_indexed > 0: - await session.commit() - logger.info(f"Committed batch: {documents_indexed} files indexed so far") - - logger.info( - f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped, {documents_failed} failed" - ) - return documents_indexed, documents_skipped - - -async def _index_with_delta_sync( - drive_client: GoogleDriveClient, - session: AsyncSession, - connector: any, - connector_id: int, - search_space_id: int, - user_id: str, - folder_id: str | None, - start_page_token: str, - task_logger: TaskLoggingService, - log_entry: any, - max_files: int, - include_subfolders: bool = False, - on_heartbeat_callback: HeartbeatCallbackType | None = None, - enable_summary: bool = True, -) -> tuple[int, int]: - """Perform delta sync indexing using change tracking. - - Note: include_subfolders is accepted for API consistency but delta sync - automatically tracks changes across all folders including subfolders. - - Implements 2-phase document status updates for real-time UI feedback: - - Phase 1: Collect all changes and create pending documents (visible in UI immediately) - - Phase 2: Process each file: pending → processing → ready/failed - """ - await task_logger.log_task_progress( - log_entry, - f"Starting delta sync from token: {start_page_token[:20]}...", - {"stage": "delta_sync", "start_token": start_page_token}, - ) - - changes, _final_token, error = await fetch_all_changes( - drive_client, start_page_token, folder_id - ) - - if error: - logger.error(f"Error fetching changes: {error}") - error_lower = error.lower() - if ( - "401" in error - or "invalid credentials" in error_lower - or "authError" in error - ): - raise Exception( - f"Google Drive authentication failed. Please re-authenticate. " - f"(Error: {error})" - ) - raise Exception(f"Failed to fetch Google Drive changes: {error}") - - if not changes: - logger.info("No changes detected since last sync") - return 0, 0 - - logger.info(f"Processing {len(changes)} changes") - - documents_indexed = 0 - documents_skipped = 0 - documents_failed = 0 - files_processed = 0 - - # Heartbeat tracking - update notification periodically to prevent appearing stuck - last_heartbeat_time = time.time() - - # ======================================================================= - # PHASE 1: Analyze changes and create pending documents for new/modified files - # ======================================================================= - changes_to_process = [] # List of (change, file, pending_document or None) - new_documents_created = False - - logger.info("Phase 1: Analyzing changes and creating pending documents") - - for change in changes: - if files_processed >= max_files: - break - - files_processed += 1 - change_type = categorize_change(change) - - if change_type in ["removed", "trashed"]: - file_id = change.get("fileId") - if file_id: - await _remove_document(session, file_id, search_space_id) - continue - - file = change.get("file") - if not file: - continue - - # Create pending document for this file - pending_doc, should_skip = await _create_pending_document_for_file( - session=session, - file=file, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - - if should_skip: - documents_skipped += 1 - continue - - if pending_doc and pending_doc.id is None: - # New document was created - new_documents_created = True - - changes_to_process.append((change, file, pending_doc)) - - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info("Phase 1: Committing pending documents") - await session.commit() - - # ======================================================================= - # PHASE 2: Process each file one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(changes_to_process)} changes") - - for _, file, pending_doc in changes_to_process: - # Check if it's time for a heartbeat update - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time - - indexed, skipped, failed = await _process_single_file( - drive_client=drive_client, - session=session, - file=file, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, - pending_document=pending_doc, - enable_summary=enable_summary, - ) - - documents_indexed += indexed - documents_skipped += skipped - documents_failed += failed - - if documents_indexed % 10 == 0 and documents_indexed > 0: - await session.commit() - logger.info(f"Committed batch: {documents_indexed} changes processed") - - logger.info( - f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped, {documents_failed} failed" - ) - return documents_indexed, documents_skipped - - -async def _create_pending_document_for_file( - session: AsyncSession, - file: dict, - connector_id: int, - search_space_id: int, - user_id: str, -) -> tuple[Document | None, bool]: - """ - Create a pending document for a Google Drive file if it doesn't exist. - - This is Phase 1 of the 2-phase document status update pattern. - Creates documents with 'pending' status so they appear in UI immediately. - - Args: - session: Database session - file: File metadata from Google Drive API - connector_id: ID of the Drive connector - search_space_id: ID of the search space - user_id: ID of the user - - Returns: - Tuple of (document, should_skip): - - (existing_doc, False): Existing document that needs update - - (new_pending_doc, False): New pending document created - - (None, True): File should be skipped (unchanged, rename-only, or folder) - """ - from app.connectors.google_drive.file_types import should_skip_file - - file_id = file.get("id") - file_name = file.get("name", "Unknown") - mime_type = file.get("mimeType", "") - - # Skip folders and shortcuts - if should_skip_file(mime_type): - return None, True - - if not file_id: - return None, True - - # Generate unique identifier hash for this file - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id - ) - - # Check if document exists (primary hash first, then legacy Composio hash) - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - if not existing_document: - legacy_hash = generate_unique_identifier_hash( - DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id - ) - existing_document = await check_document_by_unique_identifier( - session, legacy_hash - ) - if existing_document: - existing_document.unique_identifier_hash = unique_identifier_hash - if ( - existing_document.document_type - == DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR - ): - existing_document.document_type = DocumentType.GOOGLE_DRIVE_FILE - logger.info(f"Migrated legacy Composio document to native type: {file_id}") - - if existing_document: - # Check if this is a rename-only update (content unchanged) - incoming_md5 = file.get("md5Checksum") - incoming_modified_time = file.get("modifiedTime") - doc_metadata = existing_document.document_metadata or {} - stored_md5 = doc_metadata.get("md5_checksum") - stored_modified_time = doc_metadata.get("modified_time") - - # Determine if content changed - content_unchanged = False - if incoming_md5 and stored_md5: - content_unchanged = incoming_md5 == stored_md5 - elif not incoming_md5 and incoming_modified_time and stored_modified_time: - # Google Workspace file - use modifiedTime as fallback - content_unchanged = incoming_modified_time == stored_modified_time - - if content_unchanged: - # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state( - existing_document.status, DocumentStatus.READY - ): - existing_document.status = DocumentStatus.ready() - return None, True - - # Content changed - return existing document for update - return existing_document, False - - # Create new pending document - document = Document( - search_space_id=search_space_id, - title=file_name, - document_type=DocumentType.GOOGLE_DRIVE_FILE, - document_metadata={ - "google_drive_file_id": file_id, - "google_drive_file_name": file_name, - "google_drive_mime_type": mime_type, - "connector_id": connector_id, - }, - content="Pending...", # Placeholder until processed - content_hash=unique_identifier_hash, # Temporary unique value - updated when ready - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], # Empty at creation - status=DocumentStatus.pending(), # Pending until processing starts - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - - return document, False - - -async def _check_rename_only_update( - session: AsyncSession, - file: dict, - search_space_id: int, -) -> tuple[bool, str | None]: - """ - Check if a file only needs a rename update (no content change). - - Uses md5Checksum comparison (preferred) or modifiedTime (fallback for Google Workspace files) - to detect if content has changed. This optimization prevents unnecessary ETL API calls - (Docling/LlamaCloud) for rename-only operations. - - Args: - session: Database session - file: File metadata from Google Drive API - search_space_id: ID of the search space - - Returns: - Tuple of (is_rename_only, message) - - (True, message): Only filename changed, document was updated - - (False, None): Content changed or new file, needs full processing - """ - from sqlalchemy import String, cast, select - from sqlalchemy.orm.attributes import flag_modified - - from app.db import Document - - file_id = file.get("id") - file_name = file.get("name", "Unknown") - incoming_md5 = file.get("md5Checksum") # None for Google Workspace files - incoming_modified_time = file.get("modifiedTime") - - if not file_id: - return False, None - - # Try to find existing document by file_id-based hash (primary method) - primary_hash = generate_unique_identifier_hash( - DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id - ) - existing_document = await check_document_by_unique_identifier(session, primary_hash) - - # Fallback: legacy Composio hash - if not existing_document: - legacy_hash = generate_unique_identifier_hash( - DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id - ) - existing_document = await check_document_by_unique_identifier( - session, legacy_hash - ) - - # Fallback: metadata search (covers old filename-based hashes) - if not existing_document: - result = await session.execute( - select(Document).where( - Document.search_space_id == search_space_id, - Document.document_type.in_( - [ - DocumentType.GOOGLE_DRIVE_FILE, - DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, - ] - ), - cast(Document.document_metadata["google_drive_file_id"], String) - == file_id, - ) - ) - existing_document = result.scalar_one_or_none() - if existing_document: - logger.debug(f"Found legacy document by metadata for file_id: {file_id}") - - # Migrate legacy Composio document to native type - if existing_document: - if existing_document.unique_identifier_hash != primary_hash: - existing_document.unique_identifier_hash = primary_hash - if ( - existing_document.document_type - == DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR - ): - existing_document.document_type = DocumentType.GOOGLE_DRIVE_FILE - logger.info(f"Migrated legacy Composio Drive document: {file_id}") - - if not existing_document: - # New file, needs full processing - return False, None - - # Get stored checksums/timestamps from document metadata - doc_metadata = existing_document.document_metadata or {} - stored_md5 = doc_metadata.get("md5_checksum") - stored_modified_time = doc_metadata.get("modified_time") - - # Determine if content changed using md5Checksum (preferred) or modifiedTime (fallback) - content_unchanged = False - - if incoming_md5 and stored_md5: - # Best case: Compare md5 checksums (only changes when content changes, not on rename) - content_unchanged = incoming_md5 == stored_md5 - logger.debug(f"MD5 comparison for {file_name}: unchanged={content_unchanged}") - elif incoming_md5 and not stored_md5: - # Have incoming md5 but no stored md5 (legacy doc) - need to reprocess to store it - logger.debug( - f"No stored md5 for {file_name}, will reprocess to store md5_checksum" - ) - return False, None - elif not incoming_md5: - # Google Workspace file (no md5Checksum available) - fall back to modifiedTime - # Note: modifiedTime is less reliable as it changes on rename too, but it's the best we have - if incoming_modified_time and stored_modified_time: - content_unchanged = incoming_modified_time == stored_modified_time - logger.debug( - f"ModifiedTime fallback for Google Workspace file {file_name}: unchanged={content_unchanged}" - ) - else: - # No stored modifiedTime (legacy) - reprocess to store it - return False, None - - if content_unchanged: - # Content hasn't changed - check if filename changed - old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get( - "google_drive_file_name" - ) - - if old_name and old_name != file_name: - # Rename-only update - update the document without re-processing - existing_document.title = file_name - if not existing_document.document_metadata: - existing_document.document_metadata = {} - existing_document.document_metadata["FILE_NAME"] = file_name - existing_document.document_metadata["google_drive_file_name"] = file_name - # Also update modified_time for Google Workspace files (since it changed on rename) - if incoming_modified_time: - existing_document.document_metadata["modified_time"] = ( - incoming_modified_time - ) - flag_modified(existing_document, "document_metadata") - await session.commit() - - logger.info( - f"Rename-only update: '{old_name}' → '{file_name}' (skipped ETL)" - ) - return ( - True, - f"File renamed: '{old_name}' → '{file_name}' (no content change)", - ) - else: - # Neither content nor name changed - logger.debug(f"File unchanged: {file_name}") - return True, "File unchanged (same content and name)" - - # Content changed - needs full processing - return False, None - - -async def _process_single_file( - drive_client: GoogleDriveClient, - session: AsyncSession, - file: dict, - connector_id: int, - search_space_id: int, - user_id: str, - task_logger: TaskLoggingService, - log_entry: any, - pending_document: Document | None = None, - enable_summary: bool = True, -) -> tuple[int, int, int]: - """ - Process a single file by downloading and using Surfsense's file processor. - - Implements Phase 2 of the 2-phase document status update pattern. - Updates document status: pending → processing → ready/failed - - Args: - drive_client: Google Drive client - session: Database session - file: File metadata from Google Drive API - connector_id: ID of the connector - search_space_id: ID of the search space - user_id: ID of the user - task_logger: Task logging service - log_entry: Log entry for tracking - pending_document: Optional pending document created in Phase 1 - - Returns: - Tuple of (indexed_count, skipped_count, failed_count) - """ - file_name = file.get("name", "Unknown") - mime_type = file.get("mimeType", "") - file_id = file.get("id") - - try: - logger.info(f"Processing file: {file_name} ({mime_type})") - - # Early check: Is this a rename-only update? - # This optimization prevents downloading and ETL processing for files - # where only the name changed but content is the same. - is_rename_only, rename_message = await _check_rename_only_update( - session=session, - file=file, - search_space_id=search_space_id, - ) - - if is_rename_only: - await task_logger.log_task_progress( - log_entry, - f"Skipped ETL for {file_name}: {rename_message}", - {"status": "rename_only", "reason": rename_message}, - ) - # Return 1 for renamed files (they are "indexed" in the sense that they're updated) - # Return 0 for unchanged files - if "renamed" in (rename_message or "").lower(): - return 1, 0, 0 - return 0, 1, 0 - - # Set document to PROCESSING status if we have a pending document - if pending_document: - pending_document.status = DocumentStatus.processing() - await session.commit() - - _, error, _metadata = await download_and_process_file( - client=drive_client, - file=file, - search_space_id=search_space_id, - user_id=user_id, - session=session, - task_logger=task_logger, - log_entry=log_entry, - connector_id=connector_id, - enable_summary=enable_summary, - ) - - if error: - await task_logger.log_task_progress( - log_entry, - f"Skipped {file_name}: {error}", - {"status": "skipped", "reason": error}, - ) - # Mark pending document as failed if it exists - if pending_document: - pending_document.status = DocumentStatus.failed(error) - pending_document.updated_at = get_current_timestamp() - await session.commit() - return 0, 1, 0 - - # The document was created/updated by download_and_process_file - # Find the document and ensure it has READY status - if file_id: - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id - ) - processed_doc = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - # Ensure status is READY - if processed_doc and not DocumentStatus.is_state( - processed_doc.status, DocumentStatus.READY - ): - processed_doc.status = DocumentStatus.ready() - processed_doc.updated_at = get_current_timestamp() - await session.commit() - - logger.info(f"Successfully indexed Google Drive file: {file_name}") - return 1, 0, 0 - - except Exception as e: - logger.error(f"Error processing file {file_name}: {e!s}", exc_info=True) - # Mark pending document as failed if it exists - if pending_document: - try: - pending_document.status = DocumentStatus.failed(str(e)) - pending_document.updated_at = get_current_timestamp() - await session.commit() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) - return 0, 0, 1 - - -async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int): - """Remove a document that was deleted in Drive. - - Handles both new (file_id-based) and legacy (filename-based) hash schemes. - """ - from sqlalchemy import String, cast, select - - from app.db import Document - - # First try with file_id-based hash (new method) - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id - ) - - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Fallback: legacy Composio hash - if not existing_document: - legacy_hash = generate_unique_identifier_hash( - DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id - ) - existing_document = await check_document_by_unique_identifier( - session, legacy_hash - ) - - # Fallback: metadata search (covers old filename-based hashes, both native and Composio) - if not existing_document: - result = await session.execute( - select(Document).where( - Document.search_space_id == search_space_id, - Document.document_type.in_( - [ - DocumentType.GOOGLE_DRIVE_FILE, - DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, - ] - ), - cast(Document.document_metadata["google_drive_file_id"], String) - == file_id, - ) - ) - existing_document = result.scalar_one_or_none() - if existing_document: - logger.info(f"Found legacy document by metadata for file_id: {file_id}") - - if existing_document: - await session.delete(existing_document) - logger.info(f"Removed deleted file document: {file_id}") diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 384ad85e2..96cc1cbb4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -1,11 +1,11 @@ """ Google Gmail connector indexer. -Implements 2-phase document status updates for real-time UI feedback: -- Phase 1: Create all documents with 'pending' status (visible in UI immediately) -- Phase 2: Process each document: pending → processing → ready/failed +Uses the shared IndexingPipelineService for document deduplication, +summarization, chunking, and embedding. """ +import logging import time from collections.abc import Awaitable, Callable from datetime import datetime @@ -15,21 +15,15 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.google_gmail_connector import GoogleGmailConnector -from app.db import ( - Document, - DocumentStatus, - DocumentType, - SearchSourceConnectorType, +from app.db import DocumentType, SearchSourceConnectorType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import ( + compute_content_hash, + compute_unique_identifier_hash, ) +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) from app.utils.google_credentials import ( COMPOSIO_GOOGLE_CONNECTOR_TYPES, build_composio_credentials, @@ -37,12 +31,9 @@ from app.utils.google_credentials import ( from .base import ( calculate_date_range, - check_document_by_unique_identifier, check_duplicate_document_by_hash, get_connector_by_id, - get_current_timestamp, logger, - safe_set_chunks, update_connector_last_indexed, ) @@ -51,13 +42,70 @@ ACCEPTED_GMAIL_CONNECTOR_TYPES = { SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, } -# Type hint for heartbeat callback HeartbeatCallbackType = Callable[[int], Awaitable[None]] - -# Heartbeat interval in seconds HEARTBEAT_INTERVAL_SECONDS = 30 +def _build_connector_doc( + message: dict, + markdown_content: str, + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, +) -> ConnectorDocument: + """Map a raw Gmail API message dict to a ConnectorDocument.""" + message_id = message.get("id", "") + thread_id = message.get("threadId", "") + payload = message.get("payload", {}) + headers = payload.get("headers", []) + + subject = "No Subject" + sender = "Unknown Sender" + date_str = "Unknown Date" + + for header in headers: + name = header.get("name", "").lower() + value = header.get("value", "") + if name == "subject": + subject = value + elif name == "from": + sender = value + elif name == "date": + date_str = value + + metadata = { + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date": date_str, + "connector_id": connector_id, + "document_type": "Gmail Message", + "connector_type": "Google Gmail", + } + + fallback_summary = ( + f"Google Gmail Message: {subject}\n\n" + f"From: {sender}\nDate: {date_str}\n\n" + f"{markdown_content}" + ) + + return ConnectorDocument( + title=subject, + source_markdown=markdown_content, + unique_id=message_id, + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=enable_summary, + fallback_summary=fallback_summary, + metadata=metadata, + ) + + async def index_google_gmail_messages( session: AsyncSession, connector_id: int, @@ -80,7 +128,7 @@ async def index_google_gmail_messages( start_date: Start date for filtering messages (YYYY-MM-DD format) end_date: End date for filtering messages (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) - max_messages: Maximum number of messages to fetch (default: 100) + max_messages: Maximum number of messages to fetch (default: 1000) on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: @@ -88,7 +136,6 @@ async def index_google_gmail_messages( """ task_logger = TaskLoggingService(session, search_space_id) - # Log task start log_entry = await task_logger.log_task_start( task_name="google_gmail_messages_indexing", source="connector_indexing_task", @@ -103,7 +150,7 @@ async def index_google_gmail_messages( ) try: - # Accept both native and Composio Gmail connectors + # ── Connector lookup ────────────────────────────────────────── connector = None for ct in ACCEPTED_GMAIL_CONNECTOR_TYPES: connector = await get_connector_by_id(session, connector_id, ct) @@ -117,7 +164,7 @@ async def index_google_gmail_messages( ) return 0, 0, error_msg - # Build credentials based on connector type + # ── Credential building ─────────────────────────────────────── if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: connected_account_id = connector.config.get("composio_connected_account_id") if not connected_account_id: @@ -189,6 +236,7 @@ async def index_google_gmail_messages( ) return 0, 0, "Google gmail credentials not found in connector config" + # ── Gmail client init ───────────────────────────────────────── await task_logger.log_task_progress( log_entry, f"Initializing Google gmail client for connector {connector_id}", @@ -199,14 +247,11 @@ async def index_google_gmail_messages( credentials, session, user_id, connector_id ) - # Calculate date range using last_indexed_at if dates not provided - # This ensures Gmail uses the same date logic as other connectors - # (uses last_indexed_at → now, or 365 days back for first-time indexing) calculated_start_date, calculated_end_date = calculate_date_range( connector, start_date, end_date, default_days_back=365 ) - # Fetch recent Google gmail messages + # ── Fetch messages ──────────────────────────────────────────── logger.info( f"Fetching emails for connector {connector_id} " f"from {calculated_start_date} to {calculated_end_date}" @@ -218,7 +263,6 @@ async def index_google_gmail_messages( ) if error: - # Check if this is an authentication error that requires re-authentication error_message = error error_type = "APIError" if ( @@ -243,263 +287,92 @@ async def index_google_gmail_messages( logger.info(f"Found {len(messages)} Google gmail messages to index") - documents_indexed = 0 + # ── Build ConnectorDocuments ────────────────────────────────── + connector_docs: list[ConnectorDocument] = [] documents_skipped = 0 - documents_failed = 0 # Track messages that failed processing - duplicate_content_count = ( - 0 # Track messages skipped due to duplicate content_hash - ) - - # Heartbeat tracking - update notification periodically to prevent appearing stuck - last_heartbeat_time = time.time() - - # ======================================================================= - # PHASE 1: Analyze all messages, create pending documents - # This makes ALL documents visible in the UI immediately with pending status - # ======================================================================= - messages_to_process = [] # List of dicts with document and message data - new_documents_created = False + duplicate_content_count = 0 for message in messages: try: - # Extract message information message_id = message.get("id", "") - thread_id = message.get("threadId", "") - - # Extract headers for subject and sender - payload = message.get("payload", {}) - headers = payload.get("headers", []) - - subject = "No Subject" - sender = "Unknown Sender" - date_str = "Unknown Date" - - for header in headers: - name = header.get("name", "").lower() - value = header.get("value", "") - if name == "subject": - subject = value - elif name == "from": - sender = value - elif name == "date": - date_str = value - if not message_id: - logger.warning(f"Skipping message with missing ID: {subject}") + logger.warning("Skipping message with missing ID") documents_skipped += 1 continue - # Format message to markdown markdown_content = gmail_connector.format_message_to_markdown(message) - if not markdown_content.strip(): - logger.warning(f"Skipping message with no content: {subject}") + logger.warning(f"Skipping message with no content: {message_id}") documents_skipped += 1 continue - # Generate unique identifier hash for this Gmail message - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.GOOGLE_GMAIL_CONNECTOR, message_id, search_space_id + doc = _build_connector_doc( + message, + markdown_content, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=connector.enable_summary, ) - # Generate content hash - content_hash = generate_content_hash(markdown_content, search_space_id) - - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Fallback: legacy Composio hash - if not existing_document: - legacy_hash = generate_unique_identifier_hash( - DocumentType.COMPOSIO_GMAIL_CONNECTOR, - message_id, - search_space_id, - ) - existing_document = await check_document_by_unique_identifier( - session, legacy_hash - ) - if existing_document: - existing_document.unique_identifier_hash = ( - unique_identifier_hash - ) - if ( - existing_document.document_type - == DocumentType.COMPOSIO_GMAIL_CONNECTOR - ): - existing_document.document_type = ( - DocumentType.GOOGLE_GMAIL_CONNECTOR - ) - logger.info( - f"Migrated legacy Composio Gmail document: {message_id}" - ) - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state( - existing_document.status, DocumentStatus.READY - ): - existing_document.status = DocumentStatus.ready() - documents_skipped += 1 - continue - - # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append( - { - "document": existing_document, - "is_new": False, - "markdown_content": markdown_content, - "content_hash": content_hash, - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date_str": date_str, - } - ) - continue - - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from another connector) with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash + duplicate = await check_duplicate_document_by_hash( + session, compute_content_hash(doc) ) - - if duplicate_by_content: + if duplicate: logger.info( - f"Gmail message {subject} already indexed by another connector " - f"(existing document ID: {duplicate_by_content.id}, " - f"type: {duplicate_by_content.document_type}). Skipping." + f"Gmail message {doc.title} already indexed by another connector " + f"(existing document ID: {duplicate.id}, " + f"type: {duplicate.document_type}). Skipping." ) duplicate_content_count += 1 documents_skipped += 1 continue - # Create new document with PENDING status (visible in UI immediately) - document = Document( - search_space_id=search_space_id, - title=subject, - document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, - document_metadata={ - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date": date_str, - "connector_id": connector_id, - }, - content="Pending...", # Placeholder until processed - content_hash=unique_identifier_hash, # Temporary unique value - updated when ready - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], # Empty at creation - safe for async - status=DocumentStatus.pending(), # Pending until processing starts - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - new_documents_created = True - - messages_to_process.append( - { - "document": document, - "is_new": True, - "markdown_content": markdown_content, - "content_hash": content_hash, - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date_str": date_str, - } - ) + connector_docs.append(doc) except Exception as e: - logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True) - documents_failed += 1 + logger.error(f"Error building ConnectorDocument for message: {e!s}", exc_info=True) + documents_skipped += 1 continue - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info( - f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents" - ) - await session.commit() + # ── Pipeline: migrate legacy docs + prepare + index ─────────── + pipeline = IndexingPipelineService(session) - # ======================================================================= - # PHASE 2: Process each document one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + await pipeline.migrate_legacy_docs(connector_docs) - for item in messages_to_process: - # Send heartbeat periodically + documents = await pipeline.prepare_for_indexing(connector_docs) + + doc_map = { + compute_unique_identifier_hash(cd): cd for cd in connector_docs + } + + documents_indexed = 0 + documents_failed = 0 + last_heartbeat_time = time.time() + + for document in documents: if on_heartbeat_callback: current_time = time.time() if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: await on_heartbeat_callback(documents_indexed) last_heartbeat_time = current_time - document = item["document"] - try: - # Set to PROCESSING and commit - shows "processing" in UI for THIS document only - document.status = DocumentStatus.processing() - await session.commit() + connector_doc = doc_map.get(document.unique_identifier_hash) + if connector_doc is None: + logger.warning( + f"No matching ConnectorDocument for document {document.id}, skipping" + ) + documents_failed += 1 + continue - # Heavy processing (LLM, embeddings, chunks) + try: user_llm = await get_user_long_context_llm( session, user_id, search_space_id ) - - if user_llm and connector.enable_summary: - document_metadata_for_summary = { - "message_id": item["message_id"], - "thread_id": item["thread_id"], - "subject": item["subject"], - "sender": item["sender"], - "date": item["date_str"], - "document_type": "Gmail Message", - "connector_type": "Google Gmail", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - item["markdown_content"], - user_llm, - document_metadata_for_summary, - ) - else: - summary_content = f"Google Gmail Message: {item['subject']}\n\nFrom: {item['sender']}\nDate: {item['date_str']}\n\n{item['markdown_content']}" - summary_embedding = embed_text(summary_content) - - chunks = await create_document_chunks(item["markdown_content"]) - - # Update document to READY with actual content - document.title = item["subject"] - document.content = summary_content - document.content_hash = item["content_hash"] - document.embedding = summary_embedding - document.document_metadata = { - "message_id": item["message_id"], - "thread_id": item["thread_id"], - "subject": item["subject"], - "sender": item["sender"], - "date": item["date_str"], - "connector_id": connector_id, - } - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - + await pipeline.index(document, connector_doc, user_llm) documents_indexed += 1 - # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Gmail messages processed so far" @@ -508,21 +381,12 @@ async def index_google_gmail_messages( except Exception as e: logger.error(f"Error processing Gmail message: {e!s}", exc_info=True) - # Mark document as failed with reason (visible in UI) - try: - document.status = DocumentStatus.failed(str(e)) - document.updated_at = get_current_timestamp() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) documents_failed += 1 continue - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) - # Final commit for any remaining documents not yet committed in batches logger.info(f"Final commit: Total {documents_indexed} Gmail messages processed") try: await session.commit() @@ -530,22 +394,18 @@ async def index_google_gmail_messages( "Successfully committed all Google Gmail document changes to database" ) except Exception as e: - # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower() ): logger.warning( f"Duplicate content_hash detected during final commit. " - f"This may occur if the same message was indexed by multiple connectors. " f"Rolling back and continuing. Error: {e!s}" ) await session.rollback() - # Don't fail the entire task - some documents may have been successfully indexed else: raise - # Build warning message if there were issues warning_parts = [] if duplicate_content_count > 0: warning_parts.append(f"{duplicate_content_count} duplicate") @@ -555,7 +415,6 @@ async def index_google_gmail_messages( total_processed = documents_indexed - # Log success await task_logger.log_task_success( log_entry, f"Successfully completed Google Gmail indexing for connector {connector_id}", diff --git a/surfsense_backend/tests/unit/indexing_pipeline/test_document_hashing.py b/surfsense_backend/tests/unit/indexing_pipeline/test_document_hashing.py index fe536b066..d04d8b048 100644 --- a/surfsense_backend/tests/unit/indexing_pipeline/test_document_hashing.py +++ b/surfsense_backend/tests/unit/indexing_pipeline/test_document_hashing.py @@ -3,6 +3,7 @@ import pytest from app.db import DocumentType from app.indexing_pipeline.document_hashing import ( compute_content_hash, + compute_identifier_hash, compute_unique_identifier_hash, ) @@ -61,3 +62,23 @@ def test_different_content_produces_different_content_hash(make_connector_docume doc_a = make_connector_document(source_markdown="Original content") doc_b = make_connector_document(source_markdown="Updated content") assert compute_content_hash(doc_a) != compute_content_hash(doc_b) + + +def test_compute_identifier_hash_matches_connector_doc_hash(make_connector_document): + """Raw-args hash equals ConnectorDocument hash for equivalent inputs.""" + doc = make_connector_document( + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + unique_id="msg-123", + search_space_id=5, + ) + raw_hash = compute_identifier_hash("GOOGLE_GMAIL_CONNECTOR", "msg-123", 5) + assert raw_hash == compute_unique_identifier_hash(doc) + + +def test_compute_identifier_hash_differs_for_different_inputs(): + """Different arguments produce different hashes.""" + h1 = compute_identifier_hash("GOOGLE_DRIVE_FILE", "file-1", 1) + h2 = compute_identifier_hash("GOOGLE_DRIVE_FILE", "file-2", 1) + h3 = compute_identifier_hash("GOOGLE_DRIVE_FILE", "file-1", 2) + h4 = compute_identifier_hash("COMPOSIO_GOOGLE_DRIVE_CONNECTOR", "file-1", 1) + assert len({h1, h2, h3, h4}) == 4