Merge pull request #706 from AnishSarkar22/fix/drive-index

feat: enhance Google Drive indexing
This commit is contained in:
Rohan Verma 2026-01-18 22:15:03 -08:00 committed by GitHub
commit 87a174a1fd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 1116 additions and 279 deletions

View file

@ -58,7 +58,7 @@ async def get_changes(
params = {
"pageToken": page_token,
"pageSize": 100,
"fields": "nextPageToken, newStartPageToken, changes(fileId, removed, file(id, name, mimeType, modifiedTime, size, webViewLink, parents, trashed))",
"fields": "nextPageToken, newStartPageToken, changes(fileId, removed, file(id, name, mimeType, modifiedTime, md5Checksum, size, webViewLink, parents, trashed))",
"supportsAllDrives": True,
"includeItemsFromAllDrives": True,
}

View file

@ -47,7 +47,7 @@ class GoogleDriveClient:
async def list_files(
self,
query: str = "",
fields: str = "nextPageToken, files(id, name, mimeType, modifiedTime, size, webViewLink, parents, owners, createdTime, description)",
fields: str = "nextPageToken, files(id, name, mimeType, modifiedTime, md5Checksum, size, webViewLink, parents, owners, createdTime, description)",
page_size: int = 100,
page_token: str | None = None,
) -> tuple[list[dict[str, Any]], str | None, str | None]:

View file

@ -102,6 +102,8 @@ async def download_and_process_file(
connector_info["metadata"]["file_size"] = file["size"]
if "webViewLink" in file:
connector_info["metadata"]["web_view_link"] = file["webViewLink"]
if "md5Checksum" in file:
connector_info["metadata"]["md5_checksum"] = file["md5Checksum"]
if is_google_workspace_file(mime_type):
connector_info["metadata"]["exported_as"] = "pdf"

View file

@ -157,7 +157,7 @@ async def get_file_by_id(
try:
file, error = await client.get_file_metadata(
file_id,
fields="id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink",
fields="id, name, mimeType, parents, createdTime, modifiedTime, md5Checksum, size, webViewLink, iconLink",
)
if error:
@ -228,7 +228,7 @@ async def list_folder_contents(
while True:
items, next_token, error = await client.list_files(
query=query,
fields="files(id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink)",
fields="files(id, name, mimeType, parents, createdTime, modifiedTime, md5Checksum, size, webViewLink, iconLink)",
page_size=1000, # Max allowed by Google Drive API
page_token=page_token,
)

View file

@ -1716,7 +1716,7 @@ async def run_google_drive_indexing(
connector_id: int,
search_space_id: int,
user_id: str,
items_dict: dict, # Dictionary with 'folders' and 'files' lists
items_dict: dict, # Dictionary with 'folders', 'files', and 'indexing_options'
):
"""Runs the Google Drive indexing task for folders and files with notifications."""
from uuid import UUID
@ -1730,6 +1730,7 @@ async def run_google_drive_indexing(
# Parse the structured data
items = GoogleDriveIndexRequest(**items_dict)
indexing_options = items.indexing_options
total_indexed = 0
errors = []
@ -1765,7 +1766,7 @@ async def run_google_drive_indexing(
stage="fetching",
)
# Index each folder
# Index each folder with indexing options
for folder in items.folders:
try:
indexed_count, error_message = await index_google_drive_files(
@ -1775,8 +1776,10 @@ async def run_google_drive_indexing(
user_id,
folder_id=folder.id,
folder_name=folder.name,
use_delta_sync=True,
use_delta_sync=indexing_options.incremental_sync,
update_last_indexed=False,
max_files=indexing_options.max_files_per_folder,
include_subfolders=indexing_options.include_subfolders,
)
if error_message:
errors.append(f"Folder '{folder.name}': {error_message}")
@ -1837,6 +1840,8 @@ async def run_google_drive_indexing(
# Update notification on completion
if notification:
# Refresh notification to reload attributes that may have been expired by earlier commits
await session.refresh(notification)
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,

View file

@ -10,7 +10,7 @@ from .documents import (
ExtensionDocumentMetadata,
PaginatedResponse,
)
from .google_drive import DriveItem, GoogleDriveIndexRequest
from .google_drive import DriveItem, GoogleDriveIndexingOptions, GoogleDriveIndexRequest
from .logs import LogBase, LogCreate, LogFilter, LogRead, LogUpdate
from .new_chat import (
ChatMessage,
@ -94,6 +94,7 @@ __all__ = [
"ExtensionDocumentMetadata",
"GlobalNewLLMConfigRead",
"GoogleDriveIndexRequest",
"GoogleDriveIndexingOptions",
# Base schemas
"IDModel",
# RBAC schemas

View file

@ -10,6 +10,25 @@ class DriveItem(BaseModel):
name: str = Field(..., description="Item display name")
class GoogleDriveIndexingOptions(BaseModel):
"""Indexing options for Google Drive connector."""
max_files_per_folder: int = Field(
default=100,
ge=1,
le=1000,
description="Maximum number of files to index from each folder (1-1000)",
)
incremental_sync: bool = Field(
default=True,
description="Only sync changes since last index (faster). Disable for a full re-index.",
)
include_subfolders: bool = Field(
default=True,
description="Recursively index files in subfolders of selected folders",
)
class GoogleDriveIndexRequest(BaseModel):
"""Request body for indexing Google Drive content."""
@ -19,6 +38,10 @@ class GoogleDriveIndexRequest(BaseModel):
files: list[DriveItem] = Field(
default_factory=list, description="List of specific files to index"
)
indexing_options: GoogleDriveIndexingOptions = Field(
default_factory=GoogleDriveIndexingOptions,
description="Indexing configuration options",
)
def has_items(self) -> bool:
"""Check if any items are selected."""

View file

@ -461,7 +461,7 @@ def index_google_drive_files_task(
connector_id: int,
search_space_id: int,
user_id: str,
items_dict: dict, # Dictionary with 'folders' and 'files' lists
items_dict: dict, # Dictionary with 'folders', 'files', and 'indexing_options'
):
"""Celery task to index Google Drive folders and files."""
import asyncio
@ -486,7 +486,7 @@ async def _index_google_drive_files(
connector_id: int,
search_space_id: int,
user_id: str,
items_dict: dict, # Dictionary with 'folders' and 'files' lists
items_dict: dict, # Dictionary with 'folders', 'files', and 'indexing_options'
):
"""Index Google Drive folders and files with new session."""
from app.routes.search_source_connectors_routes import (

View file

@ -72,6 +72,7 @@ async def _check_and_trigger_schedules():
index_elasticsearch_documents_task,
index_github_repos_task,
index_google_calendar_events_task,
index_google_drive_files_task,
index_google_gmail_messages_task,
index_jira_issues_task,
index_linear_issues_task,
@ -96,6 +97,7 @@ async def _check_and_trigger_schedules():
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task,
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task,
}
# Trigger indexing for each due connector
@ -106,13 +108,57 @@ async def _check_and_trigger_schedules():
f"Triggering periodic indexing for connector {connector.id} "
f"({connector.connector_type.value})"
)
task.delay(
connector.id,
connector.search_space_id,
str(connector.user_id),
None, # start_date - uses last_indexed_at
None, # end_date - uses now
)
# Special handling for Google Drive - uses config for folder/file selection
if (
connector.connector_type
== SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR
):
connector_config = connector.config or {}
selected_folders = connector_config.get("selected_folders", [])
selected_files = connector_config.get("selected_files", [])
indexing_options = connector_config.get(
"indexing_options",
{
"max_files_per_folder": 100,
"incremental_sync": True,
"include_subfolders": True,
},
)
if selected_folders or selected_files:
task.delay(
connector.id,
connector.search_space_id,
str(connector.user_id),
{
"folders": selected_folders,
"files": selected_files,
"indexing_options": indexing_options,
},
)
else:
# No folders/files selected - skip indexing but still update next_scheduled_at
# to prevent checking every minute
logger.info(
f"Google Drive connector {connector.id} has no folders or files selected, "
"skipping periodic indexing (will check again at next scheduled time)"
)
from datetime import timedelta
connector.next_scheduled_at = now + timedelta(
minutes=connector.indexing_frequency_minutes
)
await session.commit()
continue
else:
task.delay(
connector.id,
connector.search_space_id,
str(connector.user_id),
None, # start_date - uses last_indexed_at
None, # end_date - uses now
)
# Update next_scheduled_at for next run
from datetime import timedelta

View file

@ -37,6 +37,7 @@ async def index_google_drive_files(
use_delta_sync: bool = True,
update_last_indexed: bool = True,
max_files: int = 500,
include_subfolders: bool = False,
) -> tuple[int, str | None]:
"""
Index Google Drive files for a specific connector.
@ -51,6 +52,7 @@ async def index_google_drive_files(
use_delta_sync: Whether to use change tracking for incremental sync
update_last_indexed: Whether to update last_indexed_at timestamp
max_files: Maximum number of files to index
include_subfolders: Whether to recursively index files in subfolders
Returns:
Tuple of (number_of_indexed_files, error_message)
@ -144,6 +146,7 @@ async def index_google_drive_files(
task_logger=task_logger,
log_entry=log_entry,
max_files=max_files,
include_subfolders=include_subfolders,
)
else:
logger.info(f"Using full scan for connector {connector_id}")
@ -159,6 +162,7 @@ async def index_google_drive_files(
task_logger=task_logger,
log_entry=log_entry,
max_files=max_files,
include_subfolders=include_subfolders,
)
documents_indexed, documents_skipped = result
@ -168,6 +172,9 @@ async def index_google_drive_files(
if new_token and not token_error:
from sqlalchemy.orm.attributes import flag_modified
# Refresh connector to reload attributes that may have been expired by earlier commits
await session.refresh(connector)
if "folder_tokens" not in connector.config:
connector.config["folder_tokens"] = {}
connector.config["folder_tokens"][target_folder_id] = new_token
@ -375,60 +382,89 @@ async def _index_full_scan(
task_logger: TaskLoggingService,
log_entry: any,
max_files: int,
include_subfolders: bool = False,
) -> tuple[int, int]:
"""Perform full scan indexing of a folder."""
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name}",
{"stage": "full_scan", "folder_id": folder_id},
f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})",
{
"stage": "full_scan",
"folder_id": folder_id,
"include_subfolders": include_subfolders,
},
)
documents_indexed = 0
documents_skipped = 0
page_token = None
files_processed = 0
while files_processed < max_files:
files, next_token, error = await get_files_in_folder(
drive_client, folder_id, include_subfolders=False, page_token=page_token
)
# Queue of folders to process: (folder_id, folder_name)
folders_to_process = [(folder_id, folder_name)]
if error:
logger.error(f"Error listing files: {error}")
break
while folders_to_process and files_processed < max_files:
current_folder_id, current_folder_name = folders_to_process.pop(0)
logger.info(f"Processing folder: {current_folder_name} ({current_folder_id})")
page_token = None
if not files:
break
for file in files:
if files_processed >= max_files:
break
files_processed += 1
indexed, skipped = await _process_single_file(
drive_client=drive_client,
session=session,
file=file,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
while files_processed < max_files:
# Get files and folders in current folder
# include_subfolders=True here so we get folder items to queue them
files, next_token, error = await get_files_in_folder(
drive_client,
current_folder_id,
include_subfolders=True,
page_token=page_token,
)
documents_indexed += indexed
documents_skipped += skipped
if error:
logger.error(f"Error listing files in {current_folder_name}: {error}")
break
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(
f"Committed batch: {documents_indexed} files indexed so far"
if not files:
break
for file in files:
if files_processed >= max_files:
break
mime_type = file.get("mimeType", "")
# If this is a folder and include_subfolders is enabled, queue it for processing
if mime_type == "application/vnd.google-apps.folder":
if include_subfolders:
folders_to_process.append(
(file["id"], file.get("name", "Unknown"))
)
logger.debug(f"Queued subfolder: {file.get('name', 'Unknown')}")
continue
# Process the file
files_processed += 1
indexed, skipped = await _process_single_file(
drive_client=drive_client,
session=session,
file=file,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
)
page_token = next_token
if not page_token:
break
documents_indexed += indexed
documents_skipped += skipped
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(
f"Committed batch: {documents_indexed} files indexed so far"
)
page_token = next_token
if not page_token:
break
logger.info(
f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped"
@ -448,8 +484,13 @@ async def _index_with_delta_sync(
task_logger: TaskLoggingService,
log_entry: any,
max_files: int,
include_subfolders: bool = False,
) -> tuple[int, int]:
"""Perform delta sync indexing using change tracking."""
"""Perform delta sync indexing using change tracking.
Note: include_subfolders is accepted for API consistency but delta sync
automatically tracks changes across all folders including subfolders.
"""
await task_logger.log_task_progress(
log_entry,
f"Starting delta sync from token: {start_page_token[:20]}...",
@ -515,6 +556,131 @@ async def _index_with_delta_sync(
return documents_indexed, documents_skipped
async def _check_rename_only_update(
session: AsyncSession,
file: dict,
search_space_id: int,
) -> tuple[bool, str | None]:
"""
Check if a file only needs a rename update (no content change).
Uses md5Checksum comparison (preferred) or modifiedTime (fallback for Google Workspace files)
to detect if content has changed. This optimization prevents unnecessary ETL API calls
(Docling/LlamaCloud) for rename-only operations.
Args:
session: Database session
file: File metadata from Google Drive API
search_space_id: ID of the search space
Returns:
Tuple of (is_rename_only, message)
- (True, message): Only filename changed, document was updated
- (False, None): Content changed or new file, needs full processing
"""
from sqlalchemy import select
from sqlalchemy.orm.attributes import flag_modified
from app.db import Document
file_id = file.get("id")
file_name = file.get("name", "Unknown")
incoming_md5 = file.get("md5Checksum") # None for Google Workspace files
incoming_modified_time = file.get("modifiedTime")
if not file_id:
return False, None
# Try to find existing document by file_id-based hash (primary method)
primary_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
existing_document = await check_document_by_unique_identifier(session, primary_hash)
# If not found by primary hash, try searching by metadata (for legacy documents)
if not existing_document:
result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
Document.document_metadata["google_drive_file_id"].astext == file_id,
)
)
existing_document = result.scalar_one_or_none()
if existing_document:
logger.debug(f"Found legacy document by metadata for file_id: {file_id}")
if not existing_document:
# New file, needs full processing
return False, None
# Get stored checksums/timestamps from document metadata
doc_metadata = existing_document.document_metadata or {}
stored_md5 = doc_metadata.get("md5_checksum")
stored_modified_time = doc_metadata.get("modified_time")
# Determine if content changed using md5Checksum (preferred) or modifiedTime (fallback)
content_unchanged = False
if incoming_md5 and stored_md5:
# Best case: Compare md5 checksums (only changes when content changes, not on rename)
content_unchanged = incoming_md5 == stored_md5
logger.debug(f"MD5 comparison for {file_name}: unchanged={content_unchanged}")
elif incoming_md5 and not stored_md5:
# Have incoming md5 but no stored md5 (legacy doc) - need to reprocess to store it
logger.debug(
f"No stored md5 for {file_name}, will reprocess to store md5_checksum"
)
return False, None
elif not incoming_md5:
# Google Workspace file (no md5Checksum available) - fall back to modifiedTime
# Note: modifiedTime is less reliable as it changes on rename too, but it's the best we have
if incoming_modified_time and stored_modified_time:
content_unchanged = incoming_modified_time == stored_modified_time
logger.debug(
f"ModifiedTime fallback for Google Workspace file {file_name}: unchanged={content_unchanged}"
)
else:
# No stored modifiedTime (legacy) - reprocess to store it
return False, None
if content_unchanged:
# Content hasn't changed - check if filename changed
old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get(
"google_drive_file_name"
)
if old_name and old_name != file_name:
# Rename-only update - update the document without re-processing
existing_document.title = file_name
if not existing_document.document_metadata:
existing_document.document_metadata = {}
existing_document.document_metadata["FILE_NAME"] = file_name
existing_document.document_metadata["google_drive_file_name"] = file_name
# Also update modified_time for Google Workspace files (since it changed on rename)
if incoming_modified_time:
existing_document.document_metadata["modified_time"] = (
incoming_modified_time
)
flag_modified(existing_document, "document_metadata")
await session.commit()
logger.info(
f"Rename-only update: '{old_name}''{file_name}' (skipped ETL)"
)
return (
True,
f"File renamed: '{old_name}''{file_name}' (no content change)",
)
else:
# Neither content nor name changed
logger.debug(f"File unchanged: {file_name}")
return True, "File unchanged (same content and name)"
# Content changed - needs full processing
return False, None
async def _process_single_file(
drive_client: GoogleDriveClient,
session: AsyncSession,
@ -537,6 +703,27 @@ async def _process_single_file(
try:
logger.info(f"Processing file: {file_name} ({mime_type})")
# Early check: Is this a rename-only update?
# This optimization prevents downloading and ETL processing for files
# where only the name changed but content is the same.
is_rename_only, rename_message = await _check_rename_only_update(
session=session,
file=file,
search_space_id=search_space_id,
)
if is_rename_only:
await task_logger.log_task_progress(
log_entry,
f"Skipped ETL for {file_name}: {rename_message}",
{"status": "rename_only", "reason": rename_message},
)
# Return 1 for renamed files (they are "indexed" in the sense that they're updated)
# Return 0 for unchanged files
if "renamed" in (rename_message or "").lower():
return 1, 0
return 0, 1
_, error, _ = await download_and_process_file(
client=drive_client,
file=file,
@ -564,7 +751,15 @@ async def _process_single_file(
async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int):
"""Remove a document that was deleted in Drive."""
"""Remove a document that was deleted in Drive.
Handles both new (file_id-based) and legacy (filename-based) hash schemes.
"""
from sqlalchemy import select
from app.db import Document
# First try with file_id-based hash (new method)
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
@ -573,6 +768,19 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id:
session, unique_identifier_hash
)
# If not found, search by metadata (for legacy documents with filename-based hash)
if not existing_document:
result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
Document.document_metadata["google_drive_file_id"].astext == file_id,
)
)
existing_document = result.scalar_one_or_none()
if existing_document:
logger.info(f"Found legacy document by metadata for file_id: {file_id}")
if existing_document:
await session.delete(existing_document)
logger.info(f"Removed deleted file document: {file_id}")

View file

@ -31,6 +31,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
get_current_timestamp,
)
from .markdown_processor import add_received_markdown_file_document
@ -49,6 +50,160 @@ LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
)
def get_google_drive_unique_identifier(
connector: dict | None,
filename: str,
search_space_id: int,
) -> tuple[str, str | None]:
"""
Get unique identifier hash for a file, with special handling for Google Drive.
For Google Drive files, uses file_id as the unique identifier (doesn't change on rename).
For other files, uses filename.
Args:
connector: Optional connector info dict with type and metadata
filename: The filename (used for non-Google Drive files or as fallback)
search_space_id: The search space ID
Returns:
Tuple of (primary_hash, legacy_hash or None)
- For Google Drive: (file_id_based_hash, filename_based_hash for migration)
- For other sources: (filename_based_hash, None)
"""
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
metadata = connector.get("metadata", {})
file_id = metadata.get("google_drive_file_id")
if file_id:
# New method: use file_id as unique identifier (doesn't change on rename)
primary_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
# Legacy method: for backward compatibility with existing documents
# that were indexed with filename-based hash
legacy_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, filename, search_space_id
)
return primary_hash, legacy_hash
# For non-Google Drive files, use filename as before
primary_hash = generate_unique_identifier_hash(
DocumentType.FILE, filename, search_space_id
)
return primary_hash, None
async def handle_existing_document_update(
session: AsyncSession,
existing_document: Document,
content_hash: str,
connector: dict | None,
filename: str,
primary_hash: str,
) -> tuple[bool, Document | None]:
"""
Handle update logic for an existing document.
Args:
session: Database session
existing_document: The existing document found in database
content_hash: Hash of the new content
connector: Optional connector info
filename: Current filename
primary_hash: The primary hash (file_id based for Google Drive)
Returns:
Tuple of (should_skip_processing, document_to_return)
- (True, document): Content unchanged, just return existing document
- (False, None): Content changed, need to re-process
"""
# Check if this document needs hash migration (found via legacy hash)
if existing_document.unique_identifier_hash != primary_hash:
existing_document.unique_identifier_hash = primary_hash
logging.info(f"Migrated document to file_id-based identifier: {filename}")
# Check if content has changed
if existing_document.content_hash == content_hash:
# Content unchanged - check if we need to update metadata (e.g., filename changed)
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
connector_metadata = connector.get("metadata", {})
new_name = connector_metadata.get("google_drive_file_name")
# Check both possible keys for old name (FILE_NAME is used in stored documents)
doc_metadata = existing_document.document_metadata or {}
old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get(
"google_drive_file_name"
)
if new_name and old_name and old_name != new_name:
# File was renamed - update title and metadata, skip expensive processing
from sqlalchemy.orm.attributes import flag_modified
existing_document.title = new_name
if not existing_document.document_metadata:
existing_document.document_metadata = {}
existing_document.document_metadata["FILE_NAME"] = new_name
existing_document.document_metadata["google_drive_file_name"] = new_name
flag_modified(existing_document, "document_metadata")
await session.commit()
logging.info(
f"File renamed in Google Drive: '{old_name}''{new_name}' (no re-processing needed)"
)
logging.info(f"Document for file {filename} unchanged. Skipping.")
return True, existing_document
else:
# Content has changed - need to re-process
logging.info(f"Content changed for file {filename}. Updating document.")
return False, None
async def find_existing_document_with_migration(
session: AsyncSession,
primary_hash: str,
legacy_hash: str | None,
content_hash: str | None = None,
) -> Document | None:
"""
Find existing document, checking both new hash and legacy hash for migration,
with fallback to content_hash for cross-source deduplication.
Args:
session: Database session
primary_hash: The primary hash (file_id based for Google Drive)
legacy_hash: The legacy hash (filename based) for migration, or None
content_hash: The content hash for fallback deduplication, or None
Returns:
Existing document if found, None otherwise
"""
# First check with primary hash (new method)
existing_document = await check_document_by_unique_identifier(session, primary_hash)
# If not found and we have a legacy hash, check with that (migration path)
if not existing_document and legacy_hash:
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
logging.info(
"Found legacy document (filename-based hash), will migrate to file_id-based hash"
)
# Fallback: check by content_hash to catch duplicates from different sources
# This prevents unique constraint violations when the same content exists
# under a different unique_identifier (e.g., manual upload vs Google Drive)
if not existing_document and content_hash:
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
logging.info(
f"Found duplicate content from different source (content_hash match). "
f"Original document ID: {existing_document.id}, type: {existing_document.document_type}"
)
return existing_document
async def parse_with_llamacloud_retry(
file_path: str,
estimated_pages: int,
@ -158,6 +313,7 @@ async def add_received_file_document_using_unstructured(
unstructured_processed_elements: list[LangChainDocument],
search_space_id: int,
user_id: str,
connector: dict | None = None,
) -> Document | None:
"""
Process and store a file document using Unstructured service.
@ -168,6 +324,7 @@ async def add_received_file_document_using_unstructured(
unstructured_processed_elements: Processed elements from Unstructured
search_space_id: ID of the search space
user_id: ID of the user
connector: Optional connector info for Google Drive files
Returns:
Document object if successful, None if failed
@ -177,29 +334,32 @@ async def add_received_file_document_using_unstructured(
unstructured_processed_elements
)
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
# Generate unique identifier hash (uses file_id for Google Drive, filename for others)
primary_hash, legacy_hash = get_google_drive_unique_identifier(
connector, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
# Check if document exists (with migration support for Google Drive and content_hash fallback)
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash, content_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# Handle existing document (rename detection, content change check)
should_skip, doc = await handle_existing_document_update(
session,
existing_document,
content_hash,
connector,
file_name,
primary_hash,
)
if should_skip:
return doc
# Content changed - continue to update
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -251,10 +411,15 @@ async def add_received_file_document_using_unstructured(
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_type=doc_type,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "UNSTRUCTURED",
@ -263,7 +428,7 @@ async def add_received_file_document_using_unstructured(
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
unique_identifier_hash=primary_hash,
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
@ -288,6 +453,7 @@ async def add_received_file_document_using_llamacloud(
llamacloud_markdown_document: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
) -> Document | None:
"""
Process and store document content parsed by LlamaCloud.
@ -298,6 +464,7 @@ async def add_received_file_document_using_llamacloud(
llamacloud_markdown_document: Markdown content from LlamaCloud parsing
search_space_id: ID of the search space
user_id: ID of the user
connector: Optional connector info for Google Drive files
Returns:
Document object if successful, None if failed
@ -306,29 +473,32 @@ async def add_received_file_document_using_llamacloud(
# Combine all markdown documents into one
file_in_markdown = llamacloud_markdown_document
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
# Generate unique identifier hash (uses file_id for Google Drive, filename for others)
primary_hash, legacy_hash = get_google_drive_unique_identifier(
connector, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
# Check if document exists (with migration support for Google Drive and content_hash fallback)
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash, content_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# Handle existing document (rename detection, content change check)
should_skip, doc = await handle_existing_document_update(
session,
existing_document,
content_hash,
connector,
file_name,
primary_hash,
)
if should_skip:
return doc
# Content changed - continue to update
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -380,10 +550,15 @@ async def add_received_file_document_using_llamacloud(
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_type=doc_type,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "LLAMACLOUD",
@ -392,7 +567,7 @@ async def add_received_file_document_using_llamacloud(
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
unique_identifier_hash=primary_hash,
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
@ -419,6 +594,7 @@ async def add_received_file_document_using_docling(
docling_markdown_document: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
) -> Document | None:
"""
Process and store document content parsed by Docling.
@ -429,6 +605,7 @@ async def add_received_file_document_using_docling(
docling_markdown_document: Markdown content from Docling parsing
search_space_id: ID of the search space
user_id: ID of the user
connector: Optional connector info for Google Drive files
Returns:
Document object if successful, None if failed
@ -436,35 +613,38 @@ async def add_received_file_document_using_docling(
try:
file_in_markdown = docling_markdown_document
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
# Generate unique identifier hash (uses file_id for Google Drive, filename for others)
primary_hash, legacy_hash = get_google_drive_unique_identifier(
connector, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
# Check if document exists (with migration support for Google Drive and content_hash fallback)
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash, content_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logging.info(f"Document for file {file_name} unchanged. Skipping.")
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for file {file_name}. Updating document."
)
# Handle existing document (rename detection, content change check)
should_skip, doc = await handle_existing_document_update(
session,
existing_document,
content_hash,
connector,
file_name,
primary_hash,
)
if should_skip:
return doc
# Content changed - continue to update
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} in search space {search_space_id}"
f"No long context LLM configured for user {user_id} in search_space {search_space_id}"
)
# Generate summary using chunked processing for large documents
@ -534,10 +714,15 @@ async def add_received_file_document_using_docling(
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_type=doc_type,
document_metadata={
"FILE_NAME": file_name,
"ETL_SERVICE": "DOCLING",
@ -546,15 +731,15 @@ async def add_received_file_document_using_docling(
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
unique_identifier_hash=primary_hash,
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
)
session.add(document)
await session.commit()
await session.refresh(document)
session.add(document)
await session.commit()
await session.refresh(document)
return document
except SQLAlchemyError as db_error:
@ -650,7 +835,7 @@ async def process_file_in_background(
# Process markdown directly through specialized function
result = await add_received_markdown_file_document(
session, filename, markdown_content, search_space_id, user_id
session, filename, markdown_content, search_space_id, user_id, connector
)
if connector:
@ -790,7 +975,7 @@ async def process_file_in_background(
# Process transcription as markdown document
result = await add_received_markdown_file_document(
session, filename, transcribed_text, search_space_id, user_id
session, filename, transcribed_text, search_space_id, user_id, connector
)
if connector:
@ -955,7 +1140,7 @@ async def process_file_in_background(
# Pass the documents to the existing background task
result = await add_received_file_document_using_unstructured(
session, filename, docs, search_space_id, user_id
session, filename, docs, search_space_id, user_id, connector
)
if connector:
@ -1103,6 +1288,7 @@ async def process_file_in_background(
llamacloud_markdown_document=markdown_content,
search_space_id=search_space_id,
user_id=user_id,
connector=connector,
)
# Track if this document was successfully created
@ -1256,6 +1442,7 @@ async def process_file_in_background(
docling_markdown_document=result["content"],
search_space_id=search_space_id,
user_id=user_id,
connector=connector,
)
if doc_result:

View file

@ -19,16 +19,157 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
get_current_timestamp,
)
def _get_google_drive_unique_identifier(
connector: dict | None,
filename: str,
search_space_id: int,
) -> tuple[str, str | None]:
"""
Get unique identifier hash for a file, with special handling for Google Drive.
For Google Drive files, uses file_id as the unique identifier (doesn't change on rename).
For other files, uses filename.
Args:
connector: Optional connector info dict with type and metadata
filename: The filename (used for non-Google Drive files or as fallback)
search_space_id: The search space ID
Returns:
Tuple of (primary_hash, legacy_hash or None)
"""
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
metadata = connector.get("metadata", {})
file_id = metadata.get("google_drive_file_id")
if file_id:
primary_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
legacy_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, filename, search_space_id
)
return primary_hash, legacy_hash
primary_hash = generate_unique_identifier_hash(
DocumentType.FILE, filename, search_space_id
)
return primary_hash, None
async def _find_existing_document_with_migration(
session: AsyncSession,
primary_hash: str,
legacy_hash: str | None,
content_hash: str | None = None,
) -> Document | None:
"""
Find existing document, checking both new hash and legacy hash for migration,
with fallback to content_hash for cross-source deduplication.
"""
existing_document = await check_document_by_unique_identifier(session, primary_hash)
if not existing_document and legacy_hash:
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
logging.info(
"Found legacy document (filename-based hash), will migrate to file_id-based hash"
)
# Fallback: check by content_hash to catch duplicates from different sources
if not existing_document and content_hash:
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
logging.info(
f"Found duplicate content from different source (content_hash match). "
f"Original document ID: {existing_document.id}, type: {existing_document.document_type}"
)
return existing_document
async def _handle_existing_document_update(
session: AsyncSession,
existing_document: Document,
content_hash: str,
connector: dict | None,
filename: str,
primary_hash: str,
task_logger: TaskLoggingService,
log_entry,
) -> tuple[bool, Document | None]:
"""
Handle update logic for an existing document.
Returns:
Tuple of (should_skip_processing, document_to_return)
"""
# Check if this document needs hash migration
if existing_document.unique_identifier_hash != primary_hash:
existing_document.unique_identifier_hash = primary_hash
logging.info(f"Migrated document to file_id-based identifier: {filename}")
# Check if content has changed
if existing_document.content_hash == content_hash:
# Content unchanged - check if we need to update metadata (e.g., filename changed)
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
connector_metadata = connector.get("metadata", {})
new_name = connector_metadata.get("google_drive_file_name")
# Check both possible keys for old name (FILE_NAME is used in stored documents)
doc_metadata = existing_document.document_metadata or {}
old_name = (
doc_metadata.get("FILE_NAME")
or doc_metadata.get("google_drive_file_name")
or doc_metadata.get("file_name")
)
if new_name and old_name and old_name != new_name:
# File was renamed - update title and metadata, skip expensive processing
from sqlalchemy.orm.attributes import flag_modified
existing_document.title = new_name
if not existing_document.document_metadata:
existing_document.document_metadata = {}
existing_document.document_metadata["FILE_NAME"] = new_name
existing_document.document_metadata["file_name"] = new_name
existing_document.document_metadata["google_drive_file_name"] = new_name
flag_modified(existing_document, "document_metadata")
await session.commit()
logging.info(
f"File renamed in Google Drive: '{old_name}''{new_name}' (no re-processing needed)"
)
await task_logger.log_task_success(
log_entry,
f"Markdown file document unchanged: {filename}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(f"Document for markdown file {filename} unchanged. Skipping.")
return True, existing_document
else:
logging.info(
f"Content changed for markdown file {filename}. Updating document."
)
return False, None
async def add_received_markdown_file_document(
session: AsyncSession,
file_name: str,
file_in_markdown: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
) -> Document | None:
"""
Process and store a markdown file document.
@ -39,6 +180,7 @@ async def add_received_markdown_file_document(
file_in_markdown: Content of the markdown file
search_space_id: ID of the search space
user_id: ID of the user
connector: Optional connector info for Google Drive files
Returns:
Document object if successful, None if failed
@ -58,39 +200,34 @@ async def add_received_markdown_file_document(
)
try:
# Generate unique identifier hash for this markdown file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file_name, search_space_id
# Generate unique identifier hash (uses file_id for Google Drive, filename for others)
primary_hash, legacy_hash = _get_google_drive_unique_identifier(
connector, file_name, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
# Check if document exists (with migration support for Google Drive and content_hash fallback)
existing_document = await _find_existing_document_with_migration(
session, primary_hash, legacy_hash, content_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
await task_logger.log_task_success(
log_entry,
f"Markdown file document unchanged: {file_name}",
{
"duplicate_detected": True,
"existing_document_id": existing_document.id,
},
)
logging.info(
f"Document for markdown file {file_name} unchanged. Skipping."
)
return existing_document
else:
# Content has changed - update the existing document
logging.info(
f"Content changed for markdown file {file_name}. Updating document."
)
# Handle existing document (rename detection, content change check)
should_skip, doc = await _handle_existing_document_update(
session,
existing_document,
content_hash,
connector,
file_name,
primary_hash,
task_logger,
log_entry,
)
if should_skip:
return doc
# Content changed - continue to update
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -139,10 +276,15 @@ async def add_received_markdown_file_document(
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.FILE,
document_type=doc_type,
document_metadata={
"FILE_NAME": file_name,
},
@ -150,7 +292,7 @@ async def add_received_markdown_file_document(
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
unique_identifier_hash=primary_hash,
blocknote_document=blocknote_json,
updated_at=get_current_timestamp(),
)