mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-06 06:12:40 +02:00
feat: enhance Google Drive file metadata handling
- Updated Google Drive API calls to include md5Checksum in file metadata retrieval for improved content tracking. - Added logic to check for rename-only updates based on md5Checksum, optimizing document processing by preventing unnecessary ETL operations for unchanged content. - Enhanced existing document update logic to handle renaming and metadata updates more effectively, particularly for Google Drive files.
This commit is contained in:
parent
49efc50767
commit
f538d59ca3
5 changed files with 138 additions and 4 deletions
|
|
@ -58,7 +58,7 @@ async def get_changes(
|
||||||
params = {
|
params = {
|
||||||
"pageToken": page_token,
|
"pageToken": page_token,
|
||||||
"pageSize": 100,
|
"pageSize": 100,
|
||||||
"fields": "nextPageToken, newStartPageToken, changes(fileId, removed, file(id, name, mimeType, modifiedTime, size, webViewLink, parents, trashed))",
|
"fields": "nextPageToken, newStartPageToken, changes(fileId, removed, file(id, name, mimeType, modifiedTime, md5Checksum, size, webViewLink, parents, trashed))",
|
||||||
"supportsAllDrives": True,
|
"supportsAllDrives": True,
|
||||||
"includeItemsFromAllDrives": True,
|
"includeItemsFromAllDrives": True,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -47,7 +47,7 @@ class GoogleDriveClient:
|
||||||
async def list_files(
|
async def list_files(
|
||||||
self,
|
self,
|
||||||
query: str = "",
|
query: str = "",
|
||||||
fields: str = "nextPageToken, files(id, name, mimeType, modifiedTime, size, webViewLink, parents, owners, createdTime, description)",
|
fields: str = "nextPageToken, files(id, name, mimeType, modifiedTime, md5Checksum, size, webViewLink, parents, owners, createdTime, description)",
|
||||||
page_size: int = 100,
|
page_size: int = 100,
|
||||||
page_token: str | None = None,
|
page_token: str | None = None,
|
||||||
) -> tuple[list[dict[str, Any]], str | None, str | None]:
|
) -> tuple[list[dict[str, Any]], str | None, str | None]:
|
||||||
|
|
|
||||||
|
|
@ -102,6 +102,8 @@ async def download_and_process_file(
|
||||||
connector_info["metadata"]["file_size"] = file["size"]
|
connector_info["metadata"]["file_size"] = file["size"]
|
||||||
if "webViewLink" in file:
|
if "webViewLink" in file:
|
||||||
connector_info["metadata"]["web_view_link"] = file["webViewLink"]
|
connector_info["metadata"]["web_view_link"] = file["webViewLink"]
|
||||||
|
if "md5Checksum" in file:
|
||||||
|
connector_info["metadata"]["md5_checksum"] = file["md5Checksum"]
|
||||||
|
|
||||||
if is_google_workspace_file(mime_type):
|
if is_google_workspace_file(mime_type):
|
||||||
connector_info["metadata"]["exported_as"] = "pdf"
|
connector_info["metadata"]["exported_as"] = "pdf"
|
||||||
|
|
|
||||||
|
|
@ -157,7 +157,7 @@ async def get_file_by_id(
|
||||||
try:
|
try:
|
||||||
file, error = await client.get_file_metadata(
|
file, error = await client.get_file_metadata(
|
||||||
file_id,
|
file_id,
|
||||||
fields="id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink",
|
fields="id, name, mimeType, parents, createdTime, modifiedTime, md5Checksum, size, webViewLink, iconLink",
|
||||||
)
|
)
|
||||||
|
|
||||||
if error:
|
if error:
|
||||||
|
|
@ -228,7 +228,7 @@ async def list_folder_contents(
|
||||||
while True:
|
while True:
|
||||||
items, next_token, error = await client.list_files(
|
items, next_token, error = await client.list_files(
|
||||||
query=query,
|
query=query,
|
||||||
fields="files(id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink)",
|
fields="files(id, name, mimeType, parents, createdTime, modifiedTime, md5Checksum, size, webViewLink, iconLink)",
|
||||||
page_size=1000, # Max allowed by Google Drive API
|
page_size=1000, # Max allowed by Google Drive API
|
||||||
page_token=page_token,
|
page_token=page_token,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -547,6 +547,117 @@ async def _index_with_delta_sync(
|
||||||
return documents_indexed, documents_skipped
|
return documents_indexed, documents_skipped
|
||||||
|
|
||||||
|
|
||||||
|
async def _check_rename_only_update(
|
||||||
|
session: AsyncSession,
|
||||||
|
file: dict,
|
||||||
|
search_space_id: int,
|
||||||
|
) -> tuple[bool, str | None]:
|
||||||
|
"""
|
||||||
|
Check if a file only needs a rename update (no content change).
|
||||||
|
|
||||||
|
Uses md5Checksum comparison (preferred) or modifiedTime (fallback for Google Workspace files)
|
||||||
|
to detect if content has changed. This optimization prevents unnecessary ETL API calls
|
||||||
|
(Docling/LlamaCloud) for rename-only operations.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: Database session
|
||||||
|
file: File metadata from Google Drive API
|
||||||
|
search_space_id: ID of the search space
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (is_rename_only, message)
|
||||||
|
- (True, message): Only filename changed, document was updated
|
||||||
|
- (False, None): Content changed or new file, needs full processing
|
||||||
|
"""
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.orm.attributes import flag_modified
|
||||||
|
from app.db import Document
|
||||||
|
|
||||||
|
file_id = file.get("id")
|
||||||
|
file_name = file.get("name", "Unknown")
|
||||||
|
incoming_md5 = file.get("md5Checksum") # None for Google Workspace files
|
||||||
|
incoming_modified_time = file.get("modifiedTime")
|
||||||
|
|
||||||
|
if not file_id:
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
# Try to find existing document by file_id-based hash (primary method)
|
||||||
|
primary_hash = generate_unique_identifier_hash(
|
||||||
|
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
|
||||||
|
)
|
||||||
|
existing_document = await check_document_by_unique_identifier(session, primary_hash)
|
||||||
|
|
||||||
|
# If not found by primary hash, try searching by metadata (for legacy documents)
|
||||||
|
if not existing_document:
|
||||||
|
result = await session.execute(
|
||||||
|
select(Document).where(
|
||||||
|
Document.search_space_id == search_space_id,
|
||||||
|
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
|
||||||
|
Document.document_metadata["google_drive_file_id"].astext == file_id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
existing_document = result.scalar_one_or_none()
|
||||||
|
if existing_document:
|
||||||
|
logger.debug(f"Found legacy document by metadata for file_id: {file_id}")
|
||||||
|
|
||||||
|
if not existing_document:
|
||||||
|
# New file, needs full processing
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
# Get stored checksums/timestamps from document metadata
|
||||||
|
doc_metadata = existing_document.document_metadata or {}
|
||||||
|
stored_md5 = doc_metadata.get("md5_checksum")
|
||||||
|
stored_modified_time = doc_metadata.get("modified_time")
|
||||||
|
|
||||||
|
# Determine if content changed using md5Checksum (preferred) or modifiedTime (fallback)
|
||||||
|
content_unchanged = False
|
||||||
|
|
||||||
|
if incoming_md5 and stored_md5:
|
||||||
|
# Best case: Compare md5 checksums (only changes when content changes, not on rename)
|
||||||
|
content_unchanged = (incoming_md5 == stored_md5)
|
||||||
|
logger.debug(f"MD5 comparison for {file_name}: unchanged={content_unchanged}")
|
||||||
|
elif incoming_md5 and not stored_md5:
|
||||||
|
# Have incoming md5 but no stored md5 (legacy doc) - need to reprocess to store it
|
||||||
|
logger.debug(f"No stored md5 for {file_name}, will reprocess to store md5_checksum")
|
||||||
|
return False, None
|
||||||
|
elif not incoming_md5:
|
||||||
|
# Google Workspace file (no md5Checksum available) - fall back to modifiedTime
|
||||||
|
# Note: modifiedTime is less reliable as it changes on rename too, but it's the best we have
|
||||||
|
if incoming_modified_time and stored_modified_time:
|
||||||
|
content_unchanged = (incoming_modified_time == stored_modified_time)
|
||||||
|
logger.debug(f"ModifiedTime fallback for Google Workspace file {file_name}: unchanged={content_unchanged}")
|
||||||
|
else:
|
||||||
|
# No stored modifiedTime (legacy) - reprocess to store it
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
if content_unchanged:
|
||||||
|
# Content hasn't changed - check if filename changed
|
||||||
|
old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get("google_drive_file_name")
|
||||||
|
|
||||||
|
if old_name and old_name != file_name:
|
||||||
|
# Rename-only update - update the document without re-processing
|
||||||
|
existing_document.title = file_name
|
||||||
|
if not existing_document.document_metadata:
|
||||||
|
existing_document.document_metadata = {}
|
||||||
|
existing_document.document_metadata["FILE_NAME"] = file_name
|
||||||
|
existing_document.document_metadata["google_drive_file_name"] = file_name
|
||||||
|
# Also update modified_time for Google Workspace files (since it changed on rename)
|
||||||
|
if incoming_modified_time:
|
||||||
|
existing_document.document_metadata["modified_time"] = incoming_modified_time
|
||||||
|
flag_modified(existing_document, "document_metadata")
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
logger.info(f"Rename-only update: '{old_name}' → '{file_name}' (skipped ETL)")
|
||||||
|
return True, f"File renamed: '{old_name}' → '{file_name}' (no content change)"
|
||||||
|
else:
|
||||||
|
# Neither content nor name changed
|
||||||
|
logger.debug(f"File unchanged: {file_name}")
|
||||||
|
return True, "File unchanged (same content and name)"
|
||||||
|
|
||||||
|
# Content changed - needs full processing
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
|
||||||
async def _process_single_file(
|
async def _process_single_file(
|
||||||
drive_client: GoogleDriveClient,
|
drive_client: GoogleDriveClient,
|
||||||
session: AsyncSession,
|
session: AsyncSession,
|
||||||
|
|
@ -569,6 +680,27 @@ async def _process_single_file(
|
||||||
try:
|
try:
|
||||||
logger.info(f"Processing file: {file_name} ({mime_type})")
|
logger.info(f"Processing file: {file_name} ({mime_type})")
|
||||||
|
|
||||||
|
# Early check: Is this a rename-only update?
|
||||||
|
# This optimization prevents downloading and ETL processing for files
|
||||||
|
# where only the name changed but content is the same.
|
||||||
|
is_rename_only, rename_message = await _check_rename_only_update(
|
||||||
|
session=session,
|
||||||
|
file=file,
|
||||||
|
search_space_id=search_space_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if is_rename_only:
|
||||||
|
await task_logger.log_task_progress(
|
||||||
|
log_entry,
|
||||||
|
f"Skipped ETL for {file_name}: {rename_message}",
|
||||||
|
{"status": "rename_only", "reason": rename_message},
|
||||||
|
)
|
||||||
|
# Return 1 for renamed files (they are "indexed" in the sense that they're updated)
|
||||||
|
# Return 0 for unchanged files
|
||||||
|
if "renamed" in (rename_message or "").lower():
|
||||||
|
return 1, 0
|
||||||
|
return 0, 1
|
||||||
|
|
||||||
_, error, _ = await download_and_process_file(
|
_, error, _ = await download_and_process_file(
|
||||||
client=drive_client,
|
client=drive_client,
|
||||||
file=file,
|
file=file,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue