diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index cfc321df1..0e6934e2c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -3,6 +3,10 @@ Obsidian connector indexer. Indexes markdown notes from a local Obsidian vault. This connector is only available in self-hosted mode. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import os @@ -17,7 +21,7 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.config import config -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -34,6 +38,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -307,25 +312,22 @@ async def index_obsidian_vault( logger.info(f"Processing {len(files)} files after date filtering") - # Get LLM for summarization - long_context_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - indexed_count = 0 skipped_count = 0 + failed_count = 0 + duplicate_content_count = 0 # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() + # ======================================================================= + # PHASE 1: Analyze all files, create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + files_to_process = [] # List of dicts with document and file data + new_documents_created = False + for file_info in files: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(indexed_count) - last_heartbeat_time = time.time() try: file_path = file_info["path"] relative_path = file_info["relative_path"] @@ -368,13 +370,143 @@ async def index_obsidian_vault( search_space_id, ) + # Generate content hash + content_hash = generate_content_hash(content, search_space_id) + # Check for existing document existing_document = await check_document_by_unique_identifier( session, unique_identifier_hash ) - # Generate content hash - content_hash = generate_content_hash(content, search_space_id) + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() + logger.debug(f"Note {title} unchanged, skipping") + skipped_count += 1 + continue + + # Queue existing document for update (will be set to processing in Phase 2) + files_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'file_info': file_info, + 'content': content, + 'body_content': body_content, + 'frontmatter': frontmatter, + 'wiki_links': wiki_links, + 'tags': tags, + 'title': title, + 'relative_path': relative_path, + 'content_hash': content_hash, + 'unique_identifier_hash': unique_identifier_hash, + }) + continue + + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Obsidian note {title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + duplicate_content_count += 1 + skipped_count += 1 + continue + + # Create new document with PENDING status (visible in UI immediately) + document = Document( + search_space_id=search_space_id, + title=title, + document_type=DocumentType.OBSIDIAN_CONNECTOR, + document_metadata={ + "vault_name": vault_name, + "file_path": relative_path, + "connector_id": connector_id, + }, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready + unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts + updated_at=get_current_timestamp(), + created_by_id=user_id, + connector_id=connector_id, + ) + session.add(document) + new_documents_created = True + + files_to_process.append({ + 'document': document, + 'is_new': True, + 'file_info': file_info, + 'content': content, + 'body_content': body_content, + 'frontmatter': frontmatter, + 'wiki_links': wiki_links, + 'tags': tags, + 'title': title, + 'relative_path': relative_path, + 'content_hash': content_hash, + 'unique_identifier_hash': unique_identifier_hash, + }) + + except Exception as e: + logger.exception( + f"Error in Phase 1 for file {file_info.get('path', 'unknown')}: {e}" + ) + failed_count += 1 + continue + + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(files_to_process)} documents") + + # Get LLM for summarization + long_context_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + for item in files_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(indexed_count) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Extract data from item + title = item['title'] + relative_path = item['relative_path'] + content = item['content'] + body_content = item['body_content'] + frontmatter = item['frontmatter'] + wiki_links = item['wiki_links'] + tags = item['tags'] + content_hash = item['content_hash'] + file_info = item['file_info'] # Build metadata document_metadata = { @@ -404,134 +536,114 @@ async def index_obsidian_vault( ] document_string = build_document_metadata_string(metadata_sections) - if existing_document: - # Check if content has changed - if existing_document.content_hash == content_hash: - logger.debug(f"Note {title} unchanged, skipping") - skipped_count += 1 - continue - - # Update existing document - logger.info(f"Updating note: {title}") - - # Generate new summary if content changed - if long_context_llm: - new_summary, _ = await generate_document_summary( - document_string, - long_context_llm, - document_metadata, - ) - # Store summary in metadata - document_metadata["summary"] = new_summary - - # Add URL and connector_id to metadata - document_metadata["url"] = ( - f"obsidian://{vault_name}/{relative_path}" - ) - document_metadata["connector_id"] = connector_id - - existing_document.content = document_string - existing_document.content_hash = content_hash - existing_document.document_metadata = document_metadata - existing_document.updated_at = get_current_timestamp() - - # Update embedding - embedding = config.embedding_model_instance.embed(document_string) - existing_document.embedding = embedding - - # Update chunks - delete old and create new - existing_document.chunks.clear() - new_chunks = await create_document_chunks(document_string) - existing_document.chunks = new_chunks - - indexed_count += 1 - - else: - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from another connector) - with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash - ) - - if duplicate_by_content: - logger.info( - f"Obsidian note {title} already indexed by another connector " - f"(existing document ID: {duplicate_by_content.id}, " - f"type: {duplicate_by_content.document_type}). Skipping." - ) - skipped_count += 1 - continue - - # Create new document - logger.info(f"Indexing new note: {title}") - - # Generate summary - summary_content = "" - if long_context_llm: - summary_content, _ = await generate_document_summary( - document_string, - long_context_llm, - document_metadata, - ) - - # Generate embedding - embedding = config.embedding_model_instance.embed(document_string) - - # Add URL and summary to metadata - document_metadata["url"] = ( - f"obsidian://{vault_name}/{relative_path}" - ) - document_metadata["summary"] = summary_content - document_metadata["connector_id"] = connector_id - - # Create chunks - chunks = await create_document_chunks(document_string) - - # Create document - new_document = Document( - search_space_id=search_space_id, - title=title, - document_type=DocumentType.OBSIDIAN_CONNECTOR, - content=document_string, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - document_metadata=document_metadata, - embedding=embedding, - chunks=chunks, - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, + # Generate summary + summary_content = "" + if long_context_llm: + summary_content, _ = await generate_document_summary( + document_string, + long_context_llm, + document_metadata, ) - session.add(new_document) + # Generate embedding + embedding = config.embedding_model_instance.embed(document_string) - indexed_count += 1 + # Add URL and summary to metadata + document_metadata["url"] = f"obsidian://{vault_name}/{relative_path}" + document_metadata["summary"] = summary_content + document_metadata["connector_id"] = connector_id + + # Create chunks + chunks = await create_document_chunks(document_string) + + # Update document to READY with actual content + document.title = title + document.content = document_string + document.content_hash = content_hash + document.embedding = embedding + document.document_metadata = document_metadata + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + indexed_count += 1 + + # Batch commit every 10 documents (for ready status updates) + if indexed_count % 10 == 0: + logger.info( + f"Committing batch: {indexed_count} Obsidian notes processed so far" + ) + await session.commit() except Exception as e: logger.exception( - f"Error processing file {file_info.get('path', 'unknown')}: {e}" + f"Error processing file {item.get('file_info', {}).get('path', 'unknown')}: {e}" ) - skipped_count += 1 + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + failed_count += 1 continue - # Update connector's last indexed timestamp + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs await update_connector_last_indexed(session, connector, update_last_indexed) - # Commit all changes - await session.commit() + # Final commit for any remaining documents not yet committed in batches + logger.info( + f"Final commit: Total {indexed_count} Obsidian notes processed" + ) + try: + await session.commit() + logger.info( + "Successfully committed all Obsidian document changes to database" + ) + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same note was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") + if failed_count > 0: + warning_parts.append(f"{failed_count} failed") + warning_message = ", ".join(warning_parts) if warning_parts else None + + total_processed = indexed_count await task_logger.log_task_success( log_entry, - f"Successfully indexed {indexed_count} Obsidian notes (skipped {skipped_count})", + f"Successfully completed Obsidian vault indexing for connector {connector_id}", { - "indexed_count": indexed_count, - "skipped_count": skipped_count, - "total_files": len(files), + "notes_processed": total_processed, + "documents_indexed": indexed_count, + "documents_skipped": skipped_count, + "documents_failed": failed_count, + "duplicate_content_count": duplicate_content_count, }, ) - return indexed_count, None + logger.info( + f"Obsidian vault indexing completed: {indexed_count} ready, " + f"{skipped_count} skipped, {failed_count} failed " + f"({duplicate_content_count} duplicate content)" + ) + return total_processed, warning_message except SQLAlchemyError as e: logger.exception(f"Database error during Obsidian indexing: {e}") diff --git a/surfsense_backend/app/tasks/document_processors/base.py b/surfsense_backend/app/tasks/document_processors/base.py index f29207448..c8046868c 100644 --- a/surfsense_backend/app/tasks/document_processors/base.py +++ b/surfsense_backend/app/tasks/document_processors/base.py @@ -14,6 +14,34 @@ from app.db import Document md = MarkdownifyTransformer() +def safe_set_chunks(document: Document, chunks: list) -> None: + """ + Safely assign chunks to a document without triggering lazy loading. + + ALWAYS use this instead of `document.chunks = chunks` to avoid + SQLAlchemy async errors (MissingGreenlet / greenlet_spawn). + + Why this is needed: + - Direct assignment `document.chunks = chunks` triggers SQLAlchemy to + load the OLD chunks first (for comparison/orphan detection) + - This lazy loading fails in async context with asyncpg driver + - set_committed_value bypasses this by setting the value directly + + This function is safe regardless of how the document was loaded + (with or without selectinload). + + Args: + document: The Document object to update + chunks: List of Chunk objects to assign + + Example: + # Instead of: document.chunks = chunks (DANGEROUS!) + safe_set_chunks(document, chunks) # Always safe + """ + from sqlalchemy.orm.attributes import set_committed_value + set_committed_value(document, 'chunks', chunks) + + def get_current_timestamp() -> datetime: """ Get the current timestamp with timezone for updated_at field. diff --git a/surfsense_backend/app/tasks/document_processors/circleback_processor.py b/surfsense_backend/app/tasks/document_processors/circleback_processor.py index f412b51dd..e9c395c83 100644 --- a/surfsense_backend/app/tasks/document_processors/circleback_processor.py +++ b/surfsense_backend/app/tasks/document_processors/circleback_processor.py @@ -3,6 +3,11 @@ Circleback meeting document processor. This module processes meeting data received from Circleback webhooks and stores it as searchable documents in the database. + +Implements real-time document status updates for UI feedback: +- Create document with 'pending' status (visible in UI immediately) +- Set to 'processing' while processing content +- Set to 'ready' or 'failed' when complete """ import logging @@ -14,6 +19,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.db import ( Document, + DocumentStatus, DocumentType, SearchSourceConnector, SearchSourceConnectorType, @@ -30,6 +36,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, get_current_timestamp, + safe_set_chunks, ) logger = logging.getLogger(__name__) @@ -47,6 +54,11 @@ async def add_circleback_meeting_document( """ Process and store a Circleback meeting document. + Implements real-time document status updates: + - Phase 1: Create document with 'pending' status (visible in UI immediately) + - Phase 2: Set to 'processing' while processing content + - Phase 3: Set to 'ready' or 'failed' when complete + Args: session: Database session meeting_id: Circleback meeting ID @@ -59,6 +71,7 @@ async def add_circleback_meeting_document( Returns: Document object if successful, None if failed or duplicate """ + document = None try: # Generate unique identifier hash using Circleback meeting ID unique_identifier = f"circleback_{meeting_id}" @@ -77,6 +90,10 @@ async def add_circleback_meeting_document( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() + await session.commit() logger.info(f"Circleback meeting {meeting_id} unchanged. Skipping.") return existing_document else: @@ -84,7 +101,79 @@ async def add_circleback_meeting_document( logger.info( f"Content changed for Circleback meeting {meeting_id}. Updating document." ) + document = existing_document + # Set to PROCESSING status and commit - shows "processing" in UI + document.status = DocumentStatus.processing() + await session.commit() + else: + # ======================================================================= + # PHASE 1: Create document with PENDING status + # This makes the document visible in the UI immediately + # ======================================================================= + + # Fetch the user who set up the Circleback connector (preferred) + # or fall back to search space owner if no connector found + created_by_user_id = None + # Try to find the Circleback connector for this search space + connector_result = await session.execute( + select(SearchSourceConnector.user_id).where( + SearchSourceConnector.search_space_id == search_space_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.CIRCLEBACK_CONNECTOR, + ) + ) + connector_user = connector_result.scalar_one_or_none() + + if connector_user: + # Use the user who set up the Circleback connector + created_by_user_id = connector_user + else: + # Fallback: use search space owner if no connector found + search_space_result = await session.execute( + select(SearchSpace.user_id).where(SearchSpace.id == search_space_id) + ) + created_by_user_id = search_space_result.scalar_one_or_none() + + # Create new document with PENDING status (visible in UI immediately) + document = Document( + search_space_id=search_space_id, + title=meeting_name, + document_type=DocumentType.CIRCLEBACK, + document_metadata={ + "CIRCLEBACK_MEETING_ID": meeting_id, + "MEETING_NAME": meeting_name, + "SOURCE": "CIRCLEBACK_WEBHOOK", + "connector_id": connector_id, + }, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready + unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts + content_needs_reindexing=False, + updated_at=get_current_timestamp(), + created_by_id=created_by_user_id, + connector_id=connector_id, + ) + session.add(document) + # Commit immediately so document appears in UI with pending status + await session.commit() + logger.info( + f"Created pending Circleback meeting document {meeting_id} in search space {search_space_id}" + ) + + # ======================================================================= + # PHASE 2: Set to PROCESSING status + # ======================================================================= + document.status = DocumentStatus.processing() + await session.commit() + + # ======================================================================= + # PHASE 3: Process the document content + # ======================================================================= + # Get LLM for generating summary llm = await get_document_summary_llm(session, search_space_id) if not llm: @@ -100,7 +189,7 @@ async def add_circleback_meeting_document( summary_embedding = None else: # Generate summary with metadata - document_metadata = { + summary_metadata = { "meeting_name": meeting_name, "meeting_id": meeting_id, "document_type": "Circleback Meeting", @@ -111,7 +200,7 @@ async def add_circleback_meeting_document( }, } summary_content, summary_embedding = await generate_document_summary( - markdown_content, llm, document_metadata + markdown_content, llm, summary_metadata ) # Process chunks @@ -126,7 +215,7 @@ async def add_circleback_meeting_document( f"Failed to convert Circleback meeting {meeting_id} to BlockNote JSON, document will not be editable" ) - # Prepare document metadata + # Prepare final document metadata document_metadata = { "CIRCLEBACK_MEETING_ID": meeting_id, "MEETING_NAME": meeting_name, @@ -134,77 +223,34 @@ async def add_circleback_meeting_document( **metadata, } - # Fetch the user who set up the Circleback connector (preferred) - # or fall back to search space owner if no connector found - created_by_user_id = None + # ======================================================================= + # PHASE 4: Update document to READY status with actual content + # ======================================================================= + document.title = meeting_name + document.content = summary_content + document.content_hash = content_hash + if summary_embedding is not None: + document.embedding = summary_embedding + document.document_metadata = document_metadata + safe_set_chunks(document, chunks) + document.blocknote_document = blocknote_json + document.content_needs_reindexing = False + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + # Ensure connector_id is set (backfill for documents created before this field) + if connector_id is not None: + document.connector_id = connector_id - # Try to find the Circleback connector for this search space - connector_result = await session.execute( - select(SearchSourceConnector.user_id).where( - SearchSourceConnector.search_space_id == search_space_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.CIRCLEBACK_CONNECTOR, - ) - ) - connector_user = connector_result.scalar_one_or_none() - - if connector_user: - # Use the user who set up the Circleback connector - created_by_user_id = connector_user - else: - # Fallback: use search space owner if no connector found - search_space_result = await session.execute( - select(SearchSpace.user_id).where(SearchSpace.id == search_space_id) - ) - created_by_user_id = search_space_result.scalar_one_or_none() - - # Update or create document + await session.commit() + await session.refresh(document) + if existing_document: - # Update existing document - existing_document.title = meeting_name - existing_document.content = summary_content - existing_document.content_hash = content_hash - if summary_embedding is not None: - existing_document.embedding = summary_embedding - existing_document.document_metadata = document_metadata - existing_document.chunks = chunks - existing_document.blocknote_document = blocknote_json - existing_document.content_needs_reindexing = False - existing_document.updated_at = get_current_timestamp() - # Ensure connector_id is set (backfill for documents created before this field) - if connector_id is not None: - existing_document.connector_id = connector_id - - await session.commit() - await session.refresh(existing_document) - document = existing_document logger.info( f"Updated Circleback meeting document {meeting_id} in search space {search_space_id}" ) else: - # Create new document - document = Document( - search_space_id=search_space_id, - title=meeting_name, - document_type=DocumentType.CIRCLEBACK, - document_metadata=document_metadata, - content=summary_content, - embedding=summary_embedding, - chunks=chunks, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - blocknote_document=blocknote_json, - content_needs_reindexing=False, - updated_at=get_current_timestamp(), - created_by_id=created_by_user_id, - connector_id=connector_id, - ) - - session.add(document) - await session.commit() - await session.refresh(document) logger.info( - f"Created new Circleback meeting document {meeting_id} in search space {search_space_id}" + f"Processed Circleback meeting document {meeting_id} in search space {search_space_id} - now ready" ) return document @@ -214,8 +260,24 @@ async def add_circleback_meeting_document( logger.error( f"Database error processing Circleback meeting {meeting_id}: {db_error}" ) + # Mark document as failed if it was created + if document is not None: + try: + document.status = DocumentStatus.failed(str(db_error)) + document.updated_at = get_current_timestamp() + await session.commit() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") raise db_error except Exception as e: await session.rollback() logger.error(f"Failed to process Circleback meeting {meeting_id}: {e!s}") + # Mark document as failed if it was created + if document is not None: + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + await session.commit() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") raise RuntimeError(f"Failed to process Circleback meeting: {e!s}") from e