feat: enhance Google Drive document handling and UI integration

- Implemented support for both new file_id-based and legacy filename-based hash schemes in document processing.
- Added functions to generate unique identifier hashes and find existing documents with migration support.
- Improved existing document update logic to handle content changes and metadata updates, particularly for Google Drive files.
- Enhanced UI components to display appropriate file icons based on file types in the Google Drive connector.
- Updated document processing functions to accommodate the new connector structure and ensure seamless integration.
This commit is contained in:
Anish Sarkar 2026-01-17 14:57:31 +05:30
parent 7af3d1bc1a
commit 6550c378b2
5 changed files with 397 additions and 104 deletions

View file

@ -49,6 +49,131 @@ LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
)
def get_google_drive_unique_identifier(
connector: dict | None,
filename: str,
search_space_id: int,
) -> tuple[str, str | None]:
"""
Get unique identifier hash for a file, with special handling for Google Drive.
For Google Drive files, uses file_id as the unique identifier (doesn't change on rename).
For other files, uses filename.
Args:
connector: Optional connector info dict with type and metadata
filename: The filename (used for non-Google Drive files or as fallback)
search_space_id: The search space ID
Returns:
Tuple of (primary_hash, legacy_hash or None)
- For Google Drive: (file_id_based_hash, filename_based_hash for migration)
- For other sources: (filename_based_hash, None)
"""
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
metadata = connector.get("metadata", {})
file_id = metadata.get("google_drive_file_id")
if file_id:
# New method: use file_id as unique identifier (doesn't change on rename)
primary_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
# Legacy method: for backward compatibility with existing documents
# that were indexed with filename-based hash
legacy_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, filename, search_space_id
)
return primary_hash, legacy_hash
# For non-Google Drive files, use filename as before
primary_hash = generate_unique_identifier_hash(
DocumentType.FILE, filename, search_space_id
)
return primary_hash, None
async def handle_existing_document_update(
session: AsyncSession,
existing_document: Document,
content_hash: str,
connector: dict | None,
filename: str,
primary_hash: str,
) -> tuple[bool, Document | None]:
"""
Handle update logic for an existing document.
Args:
session: Database session
existing_document: The existing document found in database
content_hash: Hash of the new content
connector: Optional connector info
filename: Current filename
primary_hash: The primary hash (file_id based for Google Drive)
Returns:
Tuple of (should_skip_processing, document_to_return)
- (True, document): Content unchanged, just return existing document
- (False, None): Content changed, need to re-process
"""
# Check if this document needs hash migration (found via legacy hash)
if existing_document.unique_identifier_hash != primary_hash:
existing_document.unique_identifier_hash = primary_hash
logging.info(f"Migrated document to file_id-based identifier: {filename}")
# Check if content has changed
if existing_document.content_hash == content_hash:
# Content unchanged - check if we need to update metadata (e.g., filename changed)
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
metadata = connector.get("metadata", {})
new_name = metadata.get("google_drive_file_name")
old_name = (existing_document.document_metadata or {}).get("google_drive_file_name")
if new_name and old_name != new_name:
# File was renamed - update metadata only, skip expensive processing
if not existing_document.document_metadata:
existing_document.document_metadata = {}
existing_document.document_metadata["google_drive_file_name"] = new_name
await session.commit()
logging.info(f"File renamed in Google Drive: '{old_name}''{new_name}' (no re-processing needed)")
logging.info(f"Document for file {filename} unchanged. Skipping.")
return True, existing_document
else:
# Content has changed - need to re-process
logging.info(f"Content changed for file {filename}. Updating document.")
return False, None
async def find_existing_document_with_migration(
session: AsyncSession,
primary_hash: str,
legacy_hash: str | None,
) -> Document | None:
"""
Find existing document, checking both new hash and legacy hash for migration.
Args:
session: Database session
primary_hash: The primary hash (file_id based for Google Drive)
legacy_hash: The legacy hash (filename based) for migration, or None
Returns:
Existing document if found, None otherwise
"""
# First check with primary hash (new method)
existing_document = await check_document_by_unique_identifier(session, primary_hash)
# If not found and we have a legacy hash, check with that (migration path)
if not existing_document and legacy_hash:
existing_document = await check_document_by_unique_identifier(session, legacy_hash)
if existing_document:
logging.info("Found legacy document (filename-based hash), will migrate to file_id-based hash")
return existing_document
async def parse_with_llamacloud_retry(
file_path: str,
estimated_pages: int,
@ -158,6 +283,7 @@ async def add_received_file_document_using_unstructured(
unstructured_processed_elements: list[LangChainDocument],
search_space_id: int,
user_id: str,
connector: dict | None = None,
) -> Document | None:
"""
Process and store a file document using Unstructured service.
@ -168,6 +294,7 @@ async def add_received_file_document_using_unstructured(
unstructured_processed_elements: Processed elements from Unstructured
search_space_id: ID of the search space
user_id: ID of the user
connector: Optional connector info for Google Drive files
Returns:
Document object if successful, None if failed
@ -177,29 +304,27 @@ async def add_received_file_document_using_unstructured(
unstructured_processed_elements
)
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
# Generate unique identifier hash (uses file_id for Google Drive, filename for others)
primary_hash, legacy_hash = get_google_drive_unique_identifier(
connector, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
# Check if document exists (with migration support for Google Drive)
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# Handle existing document (rename detection, content change check)
should_skip, doc = await handle_existing_document_update(
session, existing_document, content_hash, connector, file_name, primary_hash
)
if should_skip:
return doc
# Content changed - continue to update
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -251,10 +376,15 @@ async def add_received_file_document_using_unstructured(
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_type=doc_type,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "UNSTRUCTURED",
@ -263,7 +393,7 @@ async def add_received_file_document_using_unstructured(
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
unique_identifier_hash=primary_hash,
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
@ -288,6 +418,7 @@ async def add_received_file_document_using_llamacloud(
llamacloud_markdown_document: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
) -> Document | None:
"""
Process and store document content parsed by LlamaCloud.
@ -298,6 +429,7 @@ async def add_received_file_document_using_llamacloud(
llamacloud_markdown_document: Markdown content from LlamaCloud parsing
search_space_id: ID of the search space
user_id: ID of the user
connector: Optional connector info for Google Drive files
Returns:
Document object if successful, None if failed
@ -306,29 +438,27 @@ async def add_received_file_document_using_llamacloud(
# Combine all markdown documents into one
file_in_markdown = llamacloud_markdown_document
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
# Generate unique identifier hash (uses file_id for Google Drive, filename for others)
primary_hash, legacy_hash = get_google_drive_unique_identifier(
connector, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
# Check if document exists (with migration support for Google Drive)
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# Handle existing document (rename detection, content change check)
should_skip, doc = await handle_existing_document_update(
session, existing_document, content_hash, connector, file_name, primary_hash
)
if should_skip:
return doc
# Content changed - continue to update
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -380,10 +510,15 @@ async def add_received_file_document_using_llamacloud(
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_type=doc_type,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "LLAMACLOUD",
@ -392,7 +527,7 @@ async def add_received_file_document_using_llamacloud(
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
unique_identifier_hash=primary_hash,
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
@ -419,6 +554,7 @@ async def add_received_file_document_using_docling(
docling_markdown_document: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
) -> Document | None:
"""
Process and store document content parsed by Docling.
@ -429,6 +565,7 @@ async def add_received_file_document_using_docling(
docling_markdown_document: Markdown content from Docling parsing
search_space_id: ID of the search space
user_id: ID of the user
connector: Optional connector info for Google Drive files
Returns:
Document object if successful, None if failed
@ -436,29 +573,27 @@ async def add_received_file_document_using_docling(
try:
file_in_markdown = docling_markdown_document
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
# Generate unique identifier hash (uses file_id for Google Drive, filename for others)
primary_hash, legacy_hash = get_google_drive_unique_identifier(
connector, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
# Check if document exists (with migration support for Google Drive)
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# Handle existing document (rename detection, content change check)
should_skip, doc = await handle_existing_document_update(
session, existing_document, content_hash, connector, file_name, primary_hash
)
if should_skip:
return doc
# Content changed - continue to update
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -534,10 +669,15 @@ async def add_received_file_document_using_docling(
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_type=doc_type,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "DOCLING",
@ -546,15 +686,15 @@ async def add_received_file_document_using_docling(
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
unique_identifier_hash=primary_hash,
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
)
session.add(document)
await session.commit()
await session.refresh(document)
session.add(document)
await session.commit()
await session.refresh(document)
return document
except SQLAlchemyError as db_error:
@ -650,7 +790,7 @@ async def process_file_in_background(
# Process markdown directly through specialized function
result = await add_received_markdown_file_document(
session, filename, markdown_content, search_space_id, user_id
session, filename, markdown_content, search_space_id, user_id, connector
)
if connector:
@ -790,7 +930,7 @@ async def process_file_in_background(
# Process transcription as markdown document
result = await add_received_markdown_file_document(
session, filename, transcribed_text, search_space_id, user_id
session, filename, transcribed_text, search_space_id, user_id, connector
)
if connector:
@ -955,7 +1095,7 @@ async def process_file_in_background(
# Pass the documents to the existing background task
result = await add_received_file_document_using_unstructured(
session, filename, docs, search_space_id, user_id
session, filename, docs, search_space_id, user_id, connector
)
if connector:
@ -1103,6 +1243,7 @@ async def process_file_in_background(
llamacloud_markdown_document=markdown_content,
search_space_id=search_space_id,
user_id=user_id,
connector=connector,
)
# Track if this document was successfully created
@ -1256,6 +1397,7 @@ async def process_file_in_background(
docling_markdown_document=result["content"],
search_space_id=search_space_id,
user_id=user_id,
connector=connector,
)
if doc_result:

View file

@ -23,12 +23,118 @@ from .base import (
)
def _get_google_drive_unique_identifier(
connector: dict | None,
filename: str,
search_space_id: int,
) -> tuple[str, str | None]:
"""
Get unique identifier hash for a file, with special handling for Google Drive.
For Google Drive files, uses file_id as the unique identifier (doesn't change on rename).
For other files, uses filename.
Args:
connector: Optional connector info dict with type and metadata
filename: The filename (used for non-Google Drive files or as fallback)
search_space_id: The search space ID
Returns:
Tuple of (primary_hash, legacy_hash or None)
"""
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
metadata = connector.get("metadata", {})
file_id = metadata.get("google_drive_file_id")
if file_id:
primary_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
legacy_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, filename, search_space_id
)
return primary_hash, legacy_hash
primary_hash = generate_unique_identifier_hash(
DocumentType.FILE, filename, search_space_id
)
return primary_hash, None
async def _find_existing_document_with_migration(
session: AsyncSession,
primary_hash: str,
legacy_hash: str | None,
) -> Document | None:
"""Find existing document, checking both new hash and legacy hash for migration."""
existing_document = await check_document_by_unique_identifier(session, primary_hash)
if not existing_document and legacy_hash:
existing_document = await check_document_by_unique_identifier(session, legacy_hash)
if existing_document:
logging.info("Found legacy document (filename-based hash), will migrate to file_id-based hash")
return existing_document
async def _handle_existing_document_update(
session: AsyncSession,
existing_document: Document,
content_hash: str,
connector: dict | None,
filename: str,
primary_hash: str,
task_logger: TaskLoggingService,
log_entry,
) -> tuple[bool, Document | None]:
"""
Handle update logic for an existing document.
Returns:
Tuple of (should_skip_processing, document_to_return)
"""
# Check if this document needs hash migration
if existing_document.unique_identifier_hash != primary_hash:
existing_document.unique_identifier_hash = primary_hash
logging.info(f"Migrated document to file_id-based identifier: {filename}")
# Check if content has changed
if existing_document.content_hash == content_hash:
# Content unchanged - check if we need to update metadata (e.g., filename changed)
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
metadata = connector.get("metadata", {})
new_name = metadata.get("google_drive_file_name")
old_name = (existing_document.document_metadata or {}).get("google_drive_file_name")
if new_name and old_name != new_name:
if not existing_document.document_metadata:
existing_document.document_metadata = {}
existing_document.document_metadata["google_drive_file_name"] = new_name
await session.commit()
logging.info(f"File renamed in Google Drive: '{old_name}''{new_name}' (no re-processing needed)")
await task_logger.log_task_success(
log_entry,
f"Markdown file document unchanged: {filename}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(f"Document for markdown file {filename} unchanged. Skipping.")
return True, existing_document
else:
logging.info(f"Content changed for markdown file {filename}. Updating document.")
return False, None
async def add_received_markdown_file_document(
session: AsyncSession,
file_name: str,
file_in_markdown: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
) -> Document | None:
"""
Process and store a markdown file document.
@ -39,6 +145,7 @@ async def add_received_markdown_file_document(
file_in_markdown: Content of the markdown file
search_space_id: ID of the search space
user_id: ID of the user
connector: Optional connector info for Google Drive files
Returns:
Document object if successful, None if failed
@ -58,39 +165,28 @@ async def add_received_markdown_file_document(
)
try:
# Generate unique identifier hash for this markdown file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
# Generate unique identifier hash (uses file_id for Google Drive, filename for others)
primary_hash, legacy_hash = _get_google_drive_unique_identifier(
connector, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
# Check if document exists (with migration support for Google Drive)
existing_document = await _find_existing_document_with_migration(
session, primary_hash, legacy_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"Markdown file document unchanged: {file_name}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document for markdown file {file_name} unchanged. Skipping."
)
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for markdown file {file_name}. Updating document."
)
# Handle existing document (rename detection, content change check)
should_skip, doc = await _handle_existing_document_update(
session, existing_document, content_hash, connector, file_name, primary_hash,
task_logger, log_entry
)
if should_skip:
return doc
# Content changed - continue to update
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -139,10 +235,15 @@ async def add_received_markdown_file_document(
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_type=doc_type,
document_metadata={
"FILE_NAME": file_name,
},
@ -150,7 +251,7 @@ async def add_received_markdown_file_document(
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
unique_identifier_hash=primary_hash,
blocknote_document=blocknote_json,
updated_at=get_current_timestamp(),
)