feat: enhance document processing with content hash deduplication

- Added support for content hash fallback in document migration to prevent duplicate entries from different sources.
- Improved existing document update logic to handle renaming and metadata updates more effectively, particularly for Google Drive files.
- Updated functions to check for existing documents with enhanced logging for better traceability of duplicate content detection.
This commit is contained in:
Anish Sarkar 2026-01-17 15:39:36 +05:30
parent 6550c378b2
commit 49efc50767
2 changed files with 65 additions and 20 deletions

View file

@ -31,6 +31,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
get_current_timestamp,
)
from .markdown_processor import add_received_markdown_file_document
@ -126,15 +127,22 @@ async def handle_existing_document_update(
if existing_document.content_hash == content_hash:
# Content unchanged - check if we need to update metadata (e.g., filename changed)
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
metadata = connector.get("metadata", {})
new_name = metadata.get("google_drive_file_name")
old_name = (existing_document.document_metadata or {}).get("google_drive_file_name")
connector_metadata = connector.get("metadata", {})
new_name = connector_metadata.get("google_drive_file_name")
# Check both possible keys for old name (FILE_NAME is used in stored documents)
doc_metadata = existing_document.document_metadata or {}
old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get("google_drive_file_name")
if new_name and old_name != new_name:
# File was renamed - update metadata only, skip expensive processing
if new_name and old_name and old_name != new_name:
# File was renamed - update title and metadata, skip expensive processing
from sqlalchemy.orm.attributes import flag_modified
existing_document.title = new_name
if not existing_document.document_metadata:
existing_document.document_metadata = {}
existing_document.document_metadata["FILE_NAME"] = new_name
existing_document.document_metadata["google_drive_file_name"] = new_name
flag_modified(existing_document, "document_metadata")
await session.commit()
logging.info(f"File renamed in Google Drive: '{old_name}''{new_name}' (no re-processing needed)")
@ -150,14 +158,17 @@ async def find_existing_document_with_migration(
session: AsyncSession,
primary_hash: str,
legacy_hash: str | None,
content_hash: str | None = None,
) -> Document | None:
"""
Find existing document, checking both new hash and legacy hash for migration.
Find existing document, checking both new hash and legacy hash for migration,
with fallback to content_hash for cross-source deduplication.
Args:
session: Database session
primary_hash: The primary hash (file_id based for Google Drive)
legacy_hash: The legacy hash (filename based) for migration, or None
content_hash: The content hash for fallback deduplication, or None
Returns:
Existing document if found, None otherwise
@ -171,6 +182,17 @@ async def find_existing_document_with_migration(
if existing_document:
logging.info("Found legacy document (filename-based hash), will migrate to file_id-based hash")
# Fallback: check by content_hash to catch duplicates from different sources
# This prevents unique constraint violations when the same content exists
# under a different unique_identifier (e.g., manual upload vs Google Drive)
if not existing_document and content_hash:
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
logging.info(
f"Found duplicate content from different source (content_hash match). "
f"Original document ID: {existing_document.id}, type: {existing_document.document_type}"
)
return existing_document
@ -312,9 +334,9 @@ async def add_received_file_document_using_unstructured(
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document exists (with migration support for Google Drive)
# Check if document exists (with migration support for Google Drive and content_hash fallback)
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash
session, primary_hash, legacy_hash, content_hash
)
if existing_document:
@ -446,9 +468,9 @@ async def add_received_file_document_using_llamacloud(
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document exists (with migration support for Google Drive)
# Check if document exists (with migration support for Google Drive and content_hash fallback)
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash
session, primary_hash, legacy_hash, content_hash
)
if existing_document:
@ -581,9 +603,9 @@ async def add_received_file_document_using_docling(
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document exists (with migration support for Google Drive)
# Check if document exists (with migration support for Google Drive and content_hash fallback)
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash
session, primary_hash, legacy_hash, content_hash
)
if existing_document:
@ -599,7 +621,7 @@ async def add_received_file_document_using_docling(
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} in search space {search_space_id}"
f"No long context LLM configured for user {user_id} in search_space {search_space_id}"
)
# Generate summary using chunked processing for large documents

View file

@ -19,6 +19,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
get_current_timestamp,
)
@ -65,8 +66,12 @@ async def _find_existing_document_with_migration(
session: AsyncSession,
primary_hash: str,
legacy_hash: str | None,
content_hash: str | None = None,
) -> Document | None:
"""Find existing document, checking both new hash and legacy hash for migration."""
"""
Find existing document, checking both new hash and legacy hash for migration,
with fallback to content_hash for cross-source deduplication.
"""
existing_document = await check_document_by_unique_identifier(session, primary_hash)
if not existing_document and legacy_hash:
@ -74,6 +79,15 @@ async def _find_existing_document_with_migration(
if existing_document:
logging.info("Found legacy document (filename-based hash), will migrate to file_id-based hash")
# Fallback: check by content_hash to catch duplicates from different sources
if not existing_document and content_hash:
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
logging.info(
f"Found duplicate content from different source (content_hash match). "
f"Original document ID: {existing_document.id}, type: {existing_document.document_type}"
)
return existing_document
@ -102,14 +116,23 @@ async def _handle_existing_document_update(
if existing_document.content_hash == content_hash:
# Content unchanged - check if we need to update metadata (e.g., filename changed)
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
metadata = connector.get("metadata", {})
new_name = metadata.get("google_drive_file_name")
old_name = (existing_document.document_metadata or {}).get("google_drive_file_name")
connector_metadata = connector.get("metadata", {})
new_name = connector_metadata.get("google_drive_file_name")
# Check both possible keys for old name (FILE_NAME is used in stored documents)
doc_metadata = existing_document.document_metadata or {}
old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get("google_drive_file_name") or doc_metadata.get("file_name")
if new_name and old_name != new_name:
if new_name and old_name and old_name != new_name:
# File was renamed - update title and metadata, skip expensive processing
from sqlalchemy.orm.attributes import flag_modified
existing_document.title = new_name
if not existing_document.document_metadata:
existing_document.document_metadata = {}
existing_document.document_metadata["FILE_NAME"] = new_name
existing_document.document_metadata["file_name"] = new_name
existing_document.document_metadata["google_drive_file_name"] = new_name
flag_modified(existing_document, "document_metadata")
await session.commit()
logging.info(f"File renamed in Google Drive: '{old_name}''{new_name}' (no re-processing needed)")
@ -173,9 +196,9 @@ async def add_received_markdown_file_document(
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document exists (with migration support for Google Drive)
# Check if document exists (with migration support for Google Drive and content_hash fallback)
existing_document = await _find_existing_document_with_migration(
session, primary_hash, legacy_hash
session, primary_hash, legacy_hash, content_hash
)
if existing_document: