diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 9e3d856ad..96f0af756 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -31,6 +31,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document, get_current_timestamp, ) from .markdown_processor import add_received_markdown_file_document @@ -126,15 +127,22 @@ async def handle_existing_document_update( if existing_document.content_hash == content_hash: # Content unchanged - check if we need to update metadata (e.g., filename changed) if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: - metadata = connector.get("metadata", {}) - new_name = metadata.get("google_drive_file_name") - old_name = (existing_document.document_metadata or {}).get("google_drive_file_name") + connector_metadata = connector.get("metadata", {}) + new_name = connector_metadata.get("google_drive_file_name") + # Check both possible keys for old name (FILE_NAME is used in stored documents) + doc_metadata = existing_document.document_metadata or {} + old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get("google_drive_file_name") - if new_name and old_name != new_name: - # File was renamed - update metadata only, skip expensive processing + if new_name and old_name and old_name != new_name: + # File was renamed - update title and metadata, skip expensive processing + from sqlalchemy.orm.attributes import flag_modified + + existing_document.title = new_name if not existing_document.document_metadata: existing_document.document_metadata = {} + existing_document.document_metadata["FILE_NAME"] = new_name existing_document.document_metadata["google_drive_file_name"] = new_name + flag_modified(existing_document, "document_metadata") await session.commit() logging.info(f"File renamed in Google Drive: '{old_name}' → '{new_name}' (no re-processing needed)") @@ -150,14 +158,17 @@ async def find_existing_document_with_migration( session: AsyncSession, primary_hash: str, legacy_hash: str | None, + content_hash: str | None = None, ) -> Document | None: """ - Find existing document, checking both new hash and legacy hash for migration. + Find existing document, checking both new hash and legacy hash for migration, + with fallback to content_hash for cross-source deduplication. Args: session: Database session primary_hash: The primary hash (file_id based for Google Drive) legacy_hash: The legacy hash (filename based) for migration, or None + content_hash: The content hash for fallback deduplication, or None Returns: Existing document if found, None otherwise @@ -171,6 +182,17 @@ async def find_existing_document_with_migration( if existing_document: logging.info("Found legacy document (filename-based hash), will migrate to file_id-based hash") + # Fallback: check by content_hash to catch duplicates from different sources + # This prevents unique constraint violations when the same content exists + # under a different unique_identifier (e.g., manual upload vs Google Drive) + if not existing_document and content_hash: + existing_document = await check_duplicate_document(session, content_hash) + if existing_document: + logging.info( + f"Found duplicate content from different source (content_hash match). " + f"Original document ID: {existing_document.id}, type: {existing_document.document_type}" + ) + return existing_document @@ -312,9 +334,9 @@ async def add_received_file_document_using_unstructured( # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document exists (with migration support for Google Drive) + # Check if document exists (with migration support for Google Drive and content_hash fallback) existing_document = await find_existing_document_with_migration( - session, primary_hash, legacy_hash + session, primary_hash, legacy_hash, content_hash ) if existing_document: @@ -446,9 +468,9 @@ async def add_received_file_document_using_llamacloud( # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document exists (with migration support for Google Drive) + # Check if document exists (with migration support for Google Drive and content_hash fallback) existing_document = await find_existing_document_with_migration( - session, primary_hash, legacy_hash + session, primary_hash, legacy_hash, content_hash ) if existing_document: @@ -581,9 +603,9 @@ async def add_received_file_document_using_docling( # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document exists (with migration support for Google Drive) + # Check if document exists (with migration support for Google Drive and content_hash fallback) existing_document = await find_existing_document_with_migration( - session, primary_hash, legacy_hash + session, primary_hash, legacy_hash, content_hash ) if existing_document: @@ -599,7 +621,7 @@ async def add_received_file_document_using_docling( user_llm = await get_user_long_context_llm(session, user_id, search_space_id) if not user_llm: raise RuntimeError( - f"No long context LLM configured for user {user_id} in search space {search_space_id}" + f"No long context LLM configured for user {user_id} in search_space {search_space_id}" ) # Generate summary using chunked processing for large documents diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index 5628c1594..e66aa9170 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -19,6 +19,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document, get_current_timestamp, ) @@ -65,8 +66,12 @@ async def _find_existing_document_with_migration( session: AsyncSession, primary_hash: str, legacy_hash: str | None, + content_hash: str | None = None, ) -> Document | None: - """Find existing document, checking both new hash and legacy hash for migration.""" + """ + Find existing document, checking both new hash and legacy hash for migration, + with fallback to content_hash for cross-source deduplication. + """ existing_document = await check_document_by_unique_identifier(session, primary_hash) if not existing_document and legacy_hash: @@ -74,6 +79,15 @@ async def _find_existing_document_with_migration( if existing_document: logging.info("Found legacy document (filename-based hash), will migrate to file_id-based hash") + # Fallback: check by content_hash to catch duplicates from different sources + if not existing_document and content_hash: + existing_document = await check_duplicate_document(session, content_hash) + if existing_document: + logging.info( + f"Found duplicate content from different source (content_hash match). " + f"Original document ID: {existing_document.id}, type: {existing_document.document_type}" + ) + return existing_document @@ -102,14 +116,23 @@ async def _handle_existing_document_update( if existing_document.content_hash == content_hash: # Content unchanged - check if we need to update metadata (e.g., filename changed) if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: - metadata = connector.get("metadata", {}) - new_name = metadata.get("google_drive_file_name") - old_name = (existing_document.document_metadata or {}).get("google_drive_file_name") + connector_metadata = connector.get("metadata", {}) + new_name = connector_metadata.get("google_drive_file_name") + # Check both possible keys for old name (FILE_NAME is used in stored documents) + doc_metadata = existing_document.document_metadata or {} + old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get("google_drive_file_name") or doc_metadata.get("file_name") - if new_name and old_name != new_name: + if new_name and old_name and old_name != new_name: + # File was renamed - update title and metadata, skip expensive processing + from sqlalchemy.orm.attributes import flag_modified + + existing_document.title = new_name if not existing_document.document_metadata: existing_document.document_metadata = {} + existing_document.document_metadata["FILE_NAME"] = new_name + existing_document.document_metadata["file_name"] = new_name existing_document.document_metadata["google_drive_file_name"] = new_name + flag_modified(existing_document, "document_metadata") await session.commit() logging.info(f"File renamed in Google Drive: '{old_name}' → '{new_name}' (no re-processing needed)") @@ -173,9 +196,9 @@ async def add_received_markdown_file_document( # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document exists (with migration support for Google Drive) + # Check if document exists (with migration support for Google Drive and content_hash fallback) existing_document = await _find_existing_document_with_migration( - session, primary_hash, legacy_hash + session, primary_hash, legacy_hash, content_hash ) if existing_document: