refactor: enhance file skipping logic across Dropbox, Google Drive, and OneDrive connectors to return unsupported extensions, improving error reporting and maintainability

This commit is contained in:
Anish Sarkar 2026-04-07 03:16:34 +05:30
parent e7beeb2a36
commit 3a1d700817
14 changed files with 344 additions and 160 deletions

View file

@ -42,18 +42,25 @@ def is_paper_file(item: dict) -> bool:
return ext == PAPER_EXTENSION
def should_skip_file(item: dict) -> bool:
def should_skip_file(item: dict) -> tuple[bool, str | None]:
"""Skip folders and truly non-indexable files.
Paper docs are non-downloadable but exportable, so they are NOT skipped.
Returns (should_skip, unsupported_extension_or_None).
"""
if is_folder(item):
return True
return True, None
if is_paper_file(item):
return False
return False, None
if not item.get("is_downloadable", True):
return True
return True, None
from pathlib import PurePosixPath
from app.config import config as app_config
name = item.get("name", "")
return should_skip_for_service(name, app_config.ETL_SERVICE)
if should_skip_for_service(name, app_config.ETL_SERVICE):
ext = PurePosixPath(name).suffix.lower()
return True, ext
return False, None

View file

@ -48,11 +48,19 @@ def should_skip_file(mime_type: str) -> bool:
return mime_type in [GOOGLE_FOLDER, GOOGLE_SHORTCUT]
def should_skip_by_extension(filename: str) -> bool:
"""Return True if the file extension is not parseable by the configured ETL service."""
def should_skip_by_extension(filename: str) -> tuple[bool, str | None]:
"""Check if the file extension is not parseable by the configured ETL service.
Returns (should_skip, unsupported_extension_or_None).
"""
from pathlib import PurePosixPath
from app.config import config as app_config
return should_skip_for_service(filename, app_config.ETL_SERVICE)
if should_skip_for_service(filename, app_config.ETL_SERVICE):
ext = PurePosixPath(filename).suffix.lower()
return True, ext
return False, None
def get_export_mime_type(mime_type: str) -> str | None:

View file

@ -40,18 +40,28 @@ def is_folder(item: dict) -> bool:
return ONEDRIVE_FOLDER_FACET in item
def should_skip_file(item: dict) -> bool:
"""Skip folders, OneNote files, remote items (shared links), packages, and unsupported extensions."""
def should_skip_file(item: dict) -> tuple[bool, str | None]:
"""Skip folders, OneNote files, remote items, packages, and unsupported extensions.
Returns (should_skip, unsupported_extension_or_None).
The second element is only set when the skip is due to an unsupported extension.
"""
if is_folder(item):
return True
return True, None
if "remoteItem" in item:
return True
return True, None
if "package" in item:
return True
return True, None
mime = item.get("file", {}).get("mimeType", "")
if mime in SKIP_MIME_TYPES:
return True
return True, None
from pathlib import PurePosixPath
from app.config import config as app_config
name = item.get("name", "")
return should_skip_for_service(name, app_config.ETL_SERVICE)
if should_skip_for_service(name, app_config.ETL_SERVICE):
ext = PurePosixPath(name).suffix.lower()
return True, ext
return False, None

View file

@ -2477,6 +2477,8 @@ async def run_google_drive_indexing(
stage="fetching",
)
total_unsupported = 0
# Index each folder with indexing options
for folder in items.folders:
try:
@ -2484,6 +2486,7 @@ async def run_google_drive_indexing(
indexed_count,
skipped_count,
error_message,
unsupported_count,
) = await index_google_drive_files(
session,
connector_id,
@ -2497,6 +2500,7 @@ async def run_google_drive_indexing(
include_subfolders=indexing_options.include_subfolders,
)
total_skipped += skipped_count
total_unsupported += unsupported_count
if error_message:
errors.append(f"Folder '{folder.name}': {error_message}")
else:
@ -2572,6 +2576,7 @@ async def run_google_drive_indexing(
indexed_count=total_indexed,
error_message=error_message,
skipped_count=total_skipped,
unsupported_count=total_unsupported,
)
except Exception as e:
@ -2642,7 +2647,7 @@ async def run_onedrive_indexing(
stage="fetching",
)
total_indexed, total_skipped, error_message = await index_onedrive_files(
total_indexed, total_skipped, error_message, total_unsupported = await index_onedrive_files(
session,
connector_id,
search_space_id,
@ -2683,6 +2688,7 @@ async def run_onedrive_indexing(
indexed_count=total_indexed,
error_message=error_message,
skipped_count=total_skipped,
unsupported_count=total_unsupported,
)
except Exception as e:
@ -2750,7 +2756,7 @@ async def run_dropbox_indexing(
stage="fetching",
)
total_indexed, total_skipped, error_message = await index_dropbox_files(
total_indexed, total_skipped, error_message, total_unsupported = await index_dropbox_files(
session,
connector_id,
search_space_id,
@ -2791,6 +2797,7 @@ async def run_dropbox_indexing(
indexed_count=total_indexed,
error_message=error_message,
skipped_count=total_skipped,
unsupported_count=total_unsupported,
)
except Exception as e:

View file

@ -421,6 +421,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
error_message: str | None = None,
is_warning: bool = False,
skipped_count: int | None = None,
unsupported_count: int | None = None,
) -> Notification:
"""
Update notification when connector indexing completes.
@ -428,10 +429,11 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
Args:
session: Database session
notification: Notification to update
indexed_count: Total number of items indexed
indexed_count: Total number of files indexed
error_message: Error message if indexing failed, or warning message (optional)
is_warning: If True, treat error_message as a warning (success case) rather than an error
skipped_count: Number of items skipped (e.g., duplicates) - optional
skipped_count: Number of files skipped (e.g., unchanged) - optional
unsupported_count: Number of files skipped because the ETL parser doesn't support them
Returns:
Updated notification
@ -440,52 +442,45 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"connector_name", "Connector"
)
# Build the skipped text if there are skipped items
skipped_text = ""
if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
skipped_text = (
f" ({skipped_count} {skipped_item_text} skipped - already indexed)"
)
unsupported_text = ""
if unsupported_count and unsupported_count > 0:
file_word = "file was" if unsupported_count == 1 else "files were"
unsupported_text = f" {unsupported_count} {file_word} not supported."
# If there's an error message but items were indexed, treat it as a warning (partial success)
# If is_warning is True, treat it as success even with 0 items (e.g., duplicates found)
# Otherwise, treat it as a failure
if error_message:
if indexed_count > 0:
# Partial success with warnings (e.g., duplicate content from other connectors)
title = f"Ready: {connector_name}"
item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}. Note: {error_message}"
file_text = "file" if indexed_count == 1 else "files"
message = f"Now searchable! {indexed_count} {file_text} synced.{unsupported_text} Note: {error_message}"
status = "completed"
elif is_warning:
# Warning case (e.g., duplicates found) - treat as success
title = f"Ready: {connector_name}"
message = f"Sync completed{skipped_text}. {error_message}"
message = f"Sync complete.{unsupported_text} {error_message}"
status = "completed"
else:
# Complete failure
title = f"Failed: {connector_name}"
message = f"Sync failed: {error_message}"
if unsupported_text:
message += unsupported_text
status = "failed"
else:
title = f"Ready: {connector_name}"
if indexed_count == 0:
if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
message = f"Already up to date! {skipped_count} {skipped_item_text} skipped (already indexed)."
if unsupported_count and unsupported_count > 0:
message = f"Sync complete.{unsupported_text}"
else:
message = "Already up to date! No new items to sync."
message = "Already up to date!"
else:
item_text = "item" if indexed_count == 1 else "items"
message = (
f"Now searchable! {indexed_count} {item_text} synced{skipped_text}."
)
file_text = "file" if indexed_count == 1 else "files"
message = f"Now searchable! {indexed_count} {file_text} synced."
if unsupported_text:
message += unsupported_text
status = "completed"
metadata_updates = {
"indexed_count": indexed_count,
"skipped_count": skipped_count or 0,
"unsupported_count": unsupported_count or 0,
"sync_stage": "completed"
if (not error_message or is_warning or indexed_count > 0)
else "failed",

View file

@ -51,7 +51,10 @@ async def _should_skip_file(
file_id = file.get("id", "")
file_name = file.get("name", "Unknown")
if skip_item(file):
skip, unsup_ext = skip_item(file)
if skip:
if unsup_ext:
return True, f"unsupported:{unsup_ext}"
return True, "folder/non-downloadable"
if not file_id:
return True, "missing file_id"
@ -287,7 +290,7 @@ async def _index_with_delta_sync(
max_files: int,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int, str]:
) -> tuple[int, int, int, str]:
"""Delta sync using Dropbox cursor-based change tracking.
Returns (indexed_count, skipped_count, new_cursor).
@ -309,12 +312,13 @@ async def _index_with_delta_sync(
if not entries:
logger.info("No changes detected since last sync")
return 0, 0, new_cursor or cursor
return 0, 0, 0, new_cursor or cursor
logger.info(f"Processing {len(entries)} change entries")
renamed_count = 0
skipped = 0
unsupported_count = 0
files_to_download: list[dict] = []
files_processed = 0
@ -339,7 +343,9 @@ async def _index_with_delta_sync(
skip, msg = await _should_skip_file(session, entry, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -360,9 +366,10 @@ async def _index_with_delta_sync(
indexed = renamed_count + batch_indexed
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Delta sync complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped, new_cursor or cursor
return indexed, skipped, unsupported_count, new_cursor or cursor
async def _index_full_scan(
@ -380,8 +387,11 @@ async def _index_full_scan(
incremental_sync: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Full scan indexing of a folder."""
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
Returns (indexed, skipped, unsupported_count).
"""
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name}",
@ -401,6 +411,7 @@ async def _index_full_scan(
renamed_count = 0
skipped = 0
unsupported_count = 0
files_to_download: list[dict] = []
all_files, error = await get_files_in_folder(
@ -420,14 +431,21 @@ async def _index_full_scan(
if incremental_sync:
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
continue
elif skip_item(file):
skipped += 1
continue
else:
item_skip, item_unsup = skip_item(file)
if item_skip:
if item_unsup:
unsupported_count += 1
else:
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
@ -466,9 +484,10 @@ async def _index_full_scan(
indexed = renamed_count + batch_indexed
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Full scan complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped
return indexed, skipped, unsupported_count
async def _index_selected_files(
@ -493,6 +512,7 @@ async def _index_selected_files(
errors: list[str] = []
renamed_count = 0
skipped = 0
unsupported_count = 0
for file_path, file_name in file_paths:
file, error = await get_file_by_path(dropbox_client, file_path)
@ -504,14 +524,21 @@ async def _index_selected_files(
if incremental_sync:
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
continue
elif skip_item(file):
skipped += 1
continue
else:
item_skip, item_unsup = skip_item(file)
if item_skip:
if item_unsup:
unsupported_count += 1
else:
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
@ -543,7 +570,7 @@ async def _index_selected_files(
user_id, pages_to_deduct, allow_exceed=True
)
return renamed_count + batch_indexed, skipped, errors
return renamed_count + batch_indexed, skipped, unsupported_count, errors
async def index_dropbox_files(
@ -552,7 +579,7 @@ async def index_dropbox_files(
search_space_id: int,
user_id: str,
items_dict: dict,
) -> tuple[int, int, str | None]:
) -> tuple[int, int, str | None, int]:
"""Index Dropbox files for a specific connector.
items_dict format:
@ -583,7 +610,7 @@ async def index_dropbox_files(
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
@ -594,7 +621,7 @@ async def index_dropbox_files(
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
connector_enable_summary = getattr(connector, "enable_summary", True)
dropbox_client = DropboxClient(session, connector_id)
@ -609,6 +636,7 @@ async def index_dropbox_files(
total_indexed = 0
total_skipped = 0
total_unsupported = 0
selected_files = items_dict.get("files", [])
if selected_files:
@ -616,7 +644,7 @@ async def index_dropbox_files(
(f.get("path", f.get("path_lower", f.get("id", ""))), f.get("name"))
for f in selected_files
]
indexed, skipped, file_errors = await _index_selected_files(
indexed, skipped, unsupported, file_errors = await _index_selected_files(
dropbox_client,
session,
file_tuples,
@ -628,6 +656,7 @@ async def index_dropbox_files(
)
total_indexed += indexed
total_skipped += skipped
total_unsupported += unsupported
if file_errors:
logger.warning(
f"File indexing errors for connector {connector_id}: {file_errors}"
@ -649,7 +678,7 @@ async def index_dropbox_files(
if can_use_delta:
logger.info(f"Using delta sync for folder {folder_name}")
indexed, skipped, new_cursor = await _index_with_delta_sync(
indexed, skipped, unsup, new_cursor = await _index_with_delta_sync(
dropbox_client,
session,
connector_id,
@ -662,9 +691,10 @@ async def index_dropbox_files(
enable_summary=connector_enable_summary,
)
folder_cursors[folder_path] = new_cursor
total_unsupported += unsup
else:
logger.info(f"Using full scan for folder {folder_name}")
indexed, skipped = await _index_full_scan(
indexed, skipped, unsup = await _index_full_scan(
dropbox_client,
session,
connector_id,
@ -679,6 +709,7 @@ async def index_dropbox_files(
incremental_sync=incremental_sync,
enable_summary=connector_enable_summary,
)
total_unsupported += unsup
total_indexed += indexed
total_skipped += skipped
@ -708,12 +739,14 @@ async def index_dropbox_files(
await task_logger.log_task_success(
log_entry,
f"Successfully completed Dropbox indexing for connector {connector_id}",
{"files_processed": total_indexed, "files_skipped": total_skipped},
{"files_processed": total_indexed, "files_skipped": total_skipped, "files_unsupported": total_unsupported},
)
logger.info(
f"Dropbox indexing completed: {total_indexed} indexed, {total_skipped} skipped"
f"Dropbox indexing completed: {total_indexed} indexed, "
f"{total_skipped} skipped, {total_unsupported} unsupported"
)
return total_indexed, total_skipped, None
return total_indexed, total_skipped, None, total_unsupported
except SQLAlchemyError as db_error:
await session.rollback()
@ -724,7 +757,7 @@ async def index_dropbox_files(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}", 0
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -734,4 +767,4 @@ async def index_dropbox_files(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Dropbox files: {e!s}", exc_info=True)
return 0, 0, f"Failed to index Dropbox files: {e!s}"
return 0, 0, f"Failed to index Dropbox files: {e!s}", 0

View file

@ -81,8 +81,9 @@ async def _should_skip_file(
if skip_mime(mime_type):
return True, "folder/shortcut"
if should_skip_by_extension(file_name):
return True, "unsupported extension"
ext_skip, unsup_ext = should_skip_by_extension(file_name)
if ext_skip:
return True, f"unsupported:{unsup_ext}"
if not file_id:
return True, "missing file_id"
@ -490,6 +491,7 @@ async def _index_selected_files(
errors: list[str] = []
renamed_count = 0
skipped = 0
unsupported_count = 0
for file_id, file_name in file_ids:
file, error = await get_file_by_id(drive_client, file_id)
@ -500,7 +502,9 @@ async def _index_selected_files(
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -544,7 +548,7 @@ async def _index_selected_files(
user_id, pages_to_deduct, allow_exceed=True
)
return renamed_count + batch_indexed, skipped, errors
return renamed_count + batch_indexed, skipped, unsupported_count, errors
# ---------------------------------------------------------------------------
@ -567,8 +571,11 @@ async def _index_full_scan(
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Full scan indexing of a folder."""
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
Returns (indexed, skipped, unsupported_count).
"""
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})",
@ -590,6 +597,7 @@ async def _index_full_scan(
renamed_count = 0
skipped = 0
unsupported_count = 0
files_processed = 0
files_to_download: list[dict] = []
folders_to_process = [(folder_id, folder_name)]
@ -630,7 +638,9 @@ async def _index_full_scan(
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -703,9 +713,10 @@ async def _index_full_scan(
indexed = renamed_count + batch_indexed
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Full scan complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped
return indexed, skipped, unsupported_count
async def _index_with_delta_sync(
@ -723,8 +734,11 @@ async def _index_with_delta_sync(
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Delta sync using change tracking."""
) -> tuple[int, int, int]:
"""Delta sync using change tracking.
Returns (indexed, skipped, unsupported_count).
"""
await task_logger.log_task_progress(
log_entry,
f"Starting delta sync from token: {start_page_token[:20]}...",
@ -759,6 +773,7 @@ async def _index_with_delta_sync(
renamed_count = 0
skipped = 0
unsupported_count = 0
files_to_download: list[dict] = []
files_processed = 0
@ -780,7 +795,9 @@ async def _index_with_delta_sync(
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -837,9 +854,10 @@ async def _index_with_delta_sync(
indexed = renamed_count + batch_indexed
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Delta sync complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped
return indexed, skipped, unsupported_count
# ---------------------------------------------------------------------------
@ -859,8 +877,11 @@ async def index_google_drive_files(
max_files: int = 500,
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, str | None]:
"""Index Google Drive files for a specific connector."""
) -> tuple[int, int, str | None, int]:
"""Index Google Drive files for a specific connector.
Returns (indexed, skipped, error_or_none, unsupported_count).
"""
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="google_drive_files_indexing",
@ -886,7 +907,7 @@ async def index_google_drive_files(
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
await task_logger.log_task_progress(
log_entry,
@ -905,7 +926,7 @@ async def index_google_drive_files(
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
@ -920,6 +941,7 @@ async def index_google_drive_files(
0,
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
0,
)
connector_enable_summary = getattr(connector, "enable_summary", True)
@ -932,7 +954,7 @@ async def index_google_drive_files(
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "MissingParameter"}
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
target_folder_id = folder_id
target_folder_name = folder_name or "Selected Folder"
@ -943,9 +965,11 @@ async def index_google_drive_files(
use_delta_sync and start_page_token and connector.last_indexed_at
)
documents_unsupported = 0
if can_use_delta:
logger.info(f"Using delta sync for connector {connector_id}")
documents_indexed, documents_skipped = await _index_with_delta_sync(
documents_indexed, documents_skipped, du = await _index_with_delta_sync(
drive_client,
session,
connector,
@ -961,8 +985,9 @@ async def index_google_drive_files(
on_heartbeat_callback,
connector_enable_summary,
)
documents_unsupported += du
logger.info("Running reconciliation scan after delta sync")
ri, rs = await _index_full_scan(
ri, rs, ru = await _index_full_scan(
drive_client,
session,
connector,
@ -980,9 +1005,10 @@ async def index_google_drive_files(
)
documents_indexed += ri
documents_skipped += rs
documents_unsupported += ru
else:
logger.info(f"Using full scan for connector {connector_id}")
documents_indexed, documents_skipped = await _index_full_scan(
documents_indexed, documents_skipped, documents_unsupported = await _index_full_scan(
drive_client,
session,
connector,
@ -1017,14 +1043,17 @@ async def index_google_drive_files(
{
"files_processed": documents_indexed,
"files_skipped": documents_skipped,
"files_unsupported": documents_unsupported,
"sync_type": "delta" if can_use_delta else "full",
"folder": target_folder_name,
},
)
logger.info(
f"Google Drive indexing completed: {documents_indexed} indexed, {documents_skipped} skipped"
f"Google Drive indexing completed: {documents_indexed} indexed, "
f"{documents_skipped} skipped, {documents_unsupported} unsupported"
)
return documents_indexed, documents_skipped, None
return documents_indexed, documents_skipped, None, documents_unsupported
except SQLAlchemyError as db_error:
await session.rollback()
@ -1035,7 +1064,7 @@ async def index_google_drive_files(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}", 0
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -1045,7 +1074,7 @@ async def index_google_drive_files(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google Drive files: {e!s}", exc_info=True)
return 0, 0, f"Failed to index Google Drive files: {e!s}"
return 0, 0, f"Failed to index Google Drive files: {e!s}", 0
async def index_google_drive_single_file(
@ -1247,7 +1276,7 @@ async def index_google_drive_selected_files(
session, connector_id, credentials=pre_built_credentials
)
indexed, skipped, errors = await _index_selected_files(
indexed, skipped, unsupported, errors = await _index_selected_files(
drive_client,
session,
files,
@ -1258,6 +1287,11 @@ async def index_google_drive_selected_files(
on_heartbeat=on_heartbeat_callback,
)
if unsupported > 0:
file_text = "file was" if unsupported == 1 else "files were"
unsup_msg = f"{unsupported} {file_text} not supported"
errors.append(unsup_msg)
await session.commit()
if errors:
@ -1265,7 +1299,7 @@ async def index_google_drive_selected_files(
log_entry,
f"Batch file indexing completed with {len(errors)} error(s)",
"; ".join(errors),
{"indexed": indexed, "skipped": skipped, "error_count": len(errors)},
{"indexed": indexed, "skipped": skipped, "unsupported": unsupported, "error_count": len(errors)},
)
else:
await task_logger.log_task_success(

View file

@ -56,7 +56,10 @@ async def _should_skip_file(
file_id = file.get("id")
file_name = file.get("name", "Unknown")
if skip_item(file):
skip, unsup_ext = skip_item(file)
if skip:
if unsup_ext:
return True, f"unsupported:{unsup_ext}"
return True, "folder/onenote/remote"
if not file_id:
return True, "missing file_id"
@ -301,6 +304,7 @@ async def _index_selected_files(
errors: list[str] = []
renamed_count = 0
skipped = 0
unsupported_count = 0
for file_id, file_name in file_ids:
file, error = await get_file_by_id(onedrive_client, file_id)
@ -311,7 +315,9 @@ async def _index_selected_files(
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -347,7 +353,7 @@ async def _index_selected_files(
user_id, pages_to_deduct, allow_exceed=True
)
return renamed_count + batch_indexed, skipped, errors
return renamed_count + batch_indexed, skipped, unsupported_count, errors
# ---------------------------------------------------------------------------
@ -369,8 +375,11 @@ async def _index_full_scan(
include_subfolders: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Full scan indexing of a folder."""
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
Returns (indexed, skipped, unsupported_count).
"""
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name}",
@ -389,6 +398,7 @@ async def _index_full_scan(
renamed_count = 0
skipped = 0
unsupported_count = 0
files_to_download: list[dict] = []
all_files, error = await get_files_in_folder(
@ -407,7 +417,9 @@ async def _index_full_scan(
for file in all_files[:max_files]:
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -450,9 +462,10 @@ async def _index_full_scan(
indexed = renamed_count + batch_indexed
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Full scan complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped
return indexed, skipped, unsupported_count
async def _index_with_delta_sync(
@ -468,8 +481,11 @@ async def _index_with_delta_sync(
max_files: int,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int, str | None]:
"""Delta sync using OneDrive change tracking. Returns (indexed, skipped, new_delta_link)."""
) -> tuple[int, int, int, str | None]:
"""Delta sync using OneDrive change tracking.
Returns (indexed, skipped, unsupported_count, new_delta_link).
"""
await task_logger.log_task_progress(
log_entry,
"Starting delta sync",
@ -489,7 +505,7 @@ async def _index_with_delta_sync(
if not changes:
logger.info("No changes detected since last sync")
return 0, 0, new_delta_link
return 0, 0, 0, new_delta_link
logger.info(f"Processing {len(changes)} delta changes")
@ -501,6 +517,7 @@ async def _index_with_delta_sync(
renamed_count = 0
skipped = 0
unsupported_count = 0
files_to_download: list[dict] = []
files_processed = 0
@ -523,7 +540,9 @@ async def _index_with_delta_sync(
skip, msg = await _should_skip_file(session, change, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -566,9 +585,10 @@ async def _index_with_delta_sync(
indexed = renamed_count + batch_indexed
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Delta sync complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped, new_delta_link
return indexed, skipped, unsupported_count, new_delta_link
# ---------------------------------------------------------------------------
@ -582,7 +602,7 @@ async def index_onedrive_files(
search_space_id: int,
user_id: str,
items_dict: dict,
) -> tuple[int, int, str | None]:
) -> tuple[int, int, str | None, int]:
"""Index OneDrive files for a specific connector.
items_dict format:
@ -609,7 +629,7 @@ async def index_onedrive_files(
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
@ -620,7 +640,7 @@ async def index_onedrive_files(
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
connector_enable_summary = getattr(connector, "enable_summary", True)
onedrive_client = OneDriveClient(session, connector_id)
@ -632,12 +652,13 @@ async def index_onedrive_files(
total_indexed = 0
total_skipped = 0
total_unsupported = 0
# Index selected individual files
selected_files = items_dict.get("files", [])
if selected_files:
file_tuples = [(f["id"], f.get("name")) for f in selected_files]
indexed, skipped, _errors = await _index_selected_files(
indexed, skipped, unsupported, _errors = await _index_selected_files(
onedrive_client,
session,
file_tuples,
@ -648,6 +669,7 @@ async def index_onedrive_files(
)
total_indexed += indexed
total_skipped += skipped
total_unsupported += unsupported
# Index selected folders
folders = items_dict.get("folders", [])
@ -661,7 +683,7 @@ async def index_onedrive_files(
if can_use_delta:
logger.info(f"Using delta sync for folder {folder_name}")
indexed, skipped, new_delta_link = await _index_with_delta_sync(
indexed, skipped, unsup, new_delta_link = await _index_with_delta_sync(
onedrive_client,
session,
connector_id,
@ -676,6 +698,7 @@ async def index_onedrive_files(
)
total_indexed += indexed
total_skipped += skipped
total_unsupported += unsup
if new_delta_link:
await session.refresh(connector)
@ -685,7 +708,7 @@ async def index_onedrive_files(
flag_modified(connector, "config")
# Reconciliation full scan
ri, rs = await _index_full_scan(
ri, rs, ru = await _index_full_scan(
onedrive_client,
session,
connector_id,
@ -701,9 +724,10 @@ async def index_onedrive_files(
)
total_indexed += ri
total_skipped += rs
total_unsupported += ru
else:
logger.info(f"Using full scan for folder {folder_name}")
indexed, skipped = await _index_full_scan(
indexed, skipped, unsup = await _index_full_scan(
onedrive_client,
session,
connector_id,
@ -719,6 +743,7 @@ async def index_onedrive_files(
)
total_indexed += indexed
total_skipped += skipped
total_unsupported += unsup
# Store new delta link for this folder
_, new_delta_link, _ = await onedrive_client.get_delta(folder_id=folder_id)
@ -737,12 +762,14 @@ async def index_onedrive_files(
await task_logger.log_task_success(
log_entry,
f"Successfully completed OneDrive indexing for connector {connector_id}",
{"files_processed": total_indexed, "files_skipped": total_skipped},
{"files_processed": total_indexed, "files_skipped": total_skipped, "files_unsupported": total_unsupported},
)
logger.info(
f"OneDrive indexing completed: {total_indexed} indexed, {total_skipped} skipped"
f"OneDrive indexing completed: {total_indexed} indexed, "
f"{total_skipped} skipped, {total_unsupported} unsupported"
)
return total_indexed, total_skipped, None
return total_indexed, total_skipped, None, total_unsupported
except SQLAlchemyError as db_error:
await session.rollback()
@ -753,7 +780,7 @@ async def index_onedrive_files(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}", 0
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -763,4 +790,4 @@ async def index_onedrive_files(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index OneDrive files: {e!s}", exc_info=True)
return 0, 0, f"Failed to index OneDrive files: {e!s}"
return 0, 0, f"Failed to index OneDrive files: {e!s}", 0

View file

@ -265,7 +265,10 @@ def full_scan_mocks(mock_dropbox_client, monkeypatch):
async def _fake_skip(session, file, search_space_id):
from app.connectors.dropbox.file_types import should_skip_file as _skip
if _skip(file):
item_skip, unsup_ext = _skip(file)
if item_skip:
if unsup_ext:
return True, f"unsupported:{unsup_ext}"
return True, "folder/non-downloadable"
return skip_results.get(file.get("id", ""), (False, None))
@ -541,7 +544,7 @@ async def test_delta_sync_deletions_call_remove_document(monkeypatch):
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
indexed, skipped, cursor = await _index_with_delta_sync(
indexed, skipped, unsupported, cursor = await _index_with_delta_sync(
mock_client,
AsyncMock(),
_CONNECTOR_ID,
@ -578,7 +581,7 @@ async def test_delta_sync_upserts_filtered_and_downloaded(monkeypatch):
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
indexed, skipped, cursor = await _index_with_delta_sync(
indexed, skipped, unsupported, cursor = await _index_with_delta_sync(
mock_client,
AsyncMock(),
_CONNECTOR_ID,
@ -628,7 +631,7 @@ async def test_delta_sync_mix_deletions_and_upserts(monkeypatch):
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
indexed, skipped, cursor = await _index_with_delta_sync(
indexed, skipped, unsupported, cursor = await _index_with_delta_sync(
mock_client,
AsyncMock(),
_CONNECTOR_ID,
@ -662,7 +665,7 @@ async def test_delta_sync_returns_new_cursor(monkeypatch):
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
indexed, skipped, cursor = await _index_with_delta_sync(
indexed, skipped, unsupported, cursor = await _index_with_delta_sync(
mock_client,
AsyncMock(),
_CONNECTOR_ID,

View file

@ -497,7 +497,7 @@ async def test_delta_sync_removals_serial_rest_parallel(monkeypatch):
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
indexed, skipped = await _index_with_delta_sync(
indexed, skipped, unsupported = await _index_with_delta_sync(
MagicMock(),
mock_session,
MagicMock(),

View file

@ -384,7 +384,7 @@ async def test_gdrive_full_scan_skips_over_quota(gdrive_full_scan_mocks, monkeyp
m["download_mock"].return_value = ([], 0)
m["batch_mock"].return_value = ([], 2, 0)
_indexed, skipped = await _run_gdrive_full_scan(m)
_indexed, skipped, _unsup = await _run_gdrive_full_scan(m)
call_files = m["download_mock"].call_args[0][1]
assert len(call_files) == 2
@ -459,7 +459,7 @@ async def test_gdrive_delta_sync_skips_over_quota(monkeypatch):
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
_indexed, skipped = await _mod._index_with_delta_sync(
_indexed, skipped, _unsupported = await _mod._index_with_delta_sync(
MagicMock(),
session,
MagicMock(),

View file

@ -14,17 +14,23 @@ pytestmark = pytest.mark.unit
def test_folder_item_is_skipped():
item = {".tag": "folder", "name": "My Folder"}
assert should_skip_file(item) is True
skip, ext = should_skip_file(item)
assert skip is True
assert ext is None
def test_paper_file_is_not_skipped():
item = {".tag": "file", "name": "notes.paper", "is_downloadable": False}
assert should_skip_file(item) is False
skip, ext = should_skip_file(item)
assert skip is False
assert ext is None
def test_non_downloadable_item_is_skipped():
item = {".tag": "file", "name": "locked.gdoc", "is_downloadable": False}
assert should_skip_file(item) is True
skip, ext = should_skip_file(item)
assert skip is True
assert ext is None
# ---------------------------------------------------------------------------
@ -49,7 +55,9 @@ def test_non_downloadable_item_is_skipped():
def test_non_parseable_extensions_are_skipped(filename, mocker):
mocker.patch("app.config.config.ETL_SERVICE", "DOCLING")
item = {".tag": "file", "name": filename}
assert should_skip_file(item) is True, f"{filename} should be skipped"
skip, ext = should_skip_file(item)
assert skip is True, f"{filename} should be skipped"
assert ext is not None
@pytest.mark.parametrize(
@ -65,9 +73,9 @@ def test_parseable_documents_are_not_skipped(filename, mocker):
for service in ("DOCLING", "LLAMACLOUD", "UNSTRUCTURED"):
mocker.patch("app.config.config.ETL_SERVICE", service)
item = {".tag": "file", "name": filename}
assert should_skip_file(item) is False, (
f"{filename} should NOT be skipped with {service}"
)
skip, ext = should_skip_file(item)
assert skip is False, f"{filename} should NOT be skipped with {service}"
assert ext is None
@pytest.mark.parametrize(
@ -79,9 +87,9 @@ def test_universal_images_are_not_skipped(filename, mocker):
for service in ("DOCLING", "LLAMACLOUD", "UNSTRUCTURED"):
mocker.patch("app.config.config.ETL_SERVICE", service)
item = {".tag": "file", "name": filename}
assert should_skip_file(item) is False, (
f"{filename} should NOT be skipped with {service}"
)
skip, ext = should_skip_file(item)
assert skip is False, f"{filename} should NOT be skipped with {service}"
assert ext is None
@pytest.mark.parametrize("filename,service,expected_skip", [
@ -111,6 +119,20 @@ def test_universal_images_are_not_skipped(filename, mocker):
def test_parser_specific_extensions(filename, service, expected_skip, mocker):
mocker.patch("app.config.config.ETL_SERVICE", service)
item = {".tag": "file", "name": filename}
assert should_skip_file(item) is expected_skip, (
skip, ext = should_skip_file(item)
assert skip is expected_skip, (
f"{filename} with {service}: expected skip={expected_skip}"
)
if expected_skip:
assert ext is not None
else:
assert ext is None
def test_returns_unsupported_extension(mocker):
"""When a file is skipped due to unsupported extension, the ext string is returned."""
mocker.patch("app.config.config.ETL_SERVICE", "DOCLING")
item = {".tag": "file", "name": "old.doc"}
skip, ext = should_skip_file(item)
assert skip is True
assert ext == ".doc"

View file

@ -14,7 +14,8 @@ def test_unsupported_extensions_are_skipped_regardless_of_service(filename, mock
"""Truly unsupported files are skipped no matter which ETL service is configured."""
for service in ("DOCLING", "LLAMACLOUD", "UNSTRUCTURED"):
mocker.patch("app.config.config.ETL_SERVICE", service)
assert should_skip_by_extension(filename) is True
skip, ext = should_skip_by_extension(filename)
assert skip is True
@pytest.mark.parametrize("filename", [
@ -25,9 +26,9 @@ def test_universal_extensions_are_not_skipped(filename, mocker):
"""Files supported by all parsers (or handled by plaintext/direct_convert) are never skipped."""
for service in ("DOCLING", "LLAMACLOUD", "UNSTRUCTURED"):
mocker.patch("app.config.config.ETL_SERVICE", service)
assert should_skip_by_extension(filename) is False, (
f"{filename} should NOT be skipped with {service}"
)
skip, ext = should_skip_by_extension(filename)
assert skip is False, f"{filename} should NOT be skipped with {service}"
assert ext is None
@pytest.mark.parametrize("filename,service,expected_skip", [
@ -42,6 +43,19 @@ def test_universal_extensions_are_not_skipped(filename, mocker):
])
def test_parser_specific_extensions(filename, service, expected_skip, mocker):
mocker.patch("app.config.config.ETL_SERVICE", service)
assert should_skip_by_extension(filename) is expected_skip, (
skip, ext = should_skip_by_extension(filename)
assert skip is expected_skip, (
f"{filename} with {service}: expected skip={expected_skip}"
)
if expected_skip:
assert ext is not None, "unsupported extension should be returned"
else:
assert ext is None
def test_returns_unsupported_extension(mocker):
"""When a file is skipped, the unsupported extension string is returned."""
mocker.patch("app.config.config.ETL_SERVICE", "DOCLING")
skip, ext = should_skip_by_extension("macro.docm")
assert skip is True
assert ext == ".docm"

View file

@ -14,22 +14,30 @@ pytestmark = pytest.mark.unit
def test_folder_is_skipped():
item = {"folder": {}, "name": "My Folder"}
assert should_skip_file(item) is True
skip, ext = should_skip_file(item)
assert skip is True
assert ext is None
def test_remote_item_is_skipped():
item = {"remoteItem": {}, "name": "shared.docx"}
assert should_skip_file(item) is True
skip, ext = should_skip_file(item)
assert skip is True
assert ext is None
def test_package_is_skipped():
item = {"package": {}, "name": "notebook"}
assert should_skip_file(item) is True
skip, ext = should_skip_file(item)
assert skip is True
assert ext is None
def test_onenote_is_skipped():
item = {"name": "notes", "file": {"mimeType": "application/msonenote"}}
assert should_skip_file(item) is True
skip, ext = should_skip_file(item)
assert skip is True
assert ext is None
# ---------------------------------------------------------------------------
@ -43,7 +51,9 @@ def test_onenote_is_skipped():
def test_unsupported_extensions_are_skipped(filename, mocker):
mocker.patch("app.config.config.ETL_SERVICE", "DOCLING")
item = {"name": filename, "file": {"mimeType": "application/octet-stream"}}
assert should_skip_file(item) is True, f"{filename} should be skipped"
skip, ext = should_skip_file(item)
assert skip is True, f"{filename} should be skipped"
assert ext is not None
@pytest.mark.parametrize("filename", [
@ -54,9 +64,9 @@ def test_universal_files_are_not_skipped(filename, mocker):
for service in ("DOCLING", "LLAMACLOUD", "UNSTRUCTURED"):
mocker.patch("app.config.config.ETL_SERVICE", service)
item = {"name": filename, "file": {"mimeType": "application/octet-stream"}}
assert should_skip_file(item) is False, (
f"{filename} should NOT be skipped with {service}"
)
skip, ext = should_skip_file(item)
assert skip is False, f"{filename} should NOT be skipped with {service}"
assert ext is None
@pytest.mark.parametrize("filename,service,expected_skip", [
@ -70,6 +80,20 @@ def test_universal_files_are_not_skipped(filename, mocker):
def test_parser_specific_extensions(filename, service, expected_skip, mocker):
mocker.patch("app.config.config.ETL_SERVICE", service)
item = {"name": filename, "file": {"mimeType": "application/octet-stream"}}
assert should_skip_file(item) is expected_skip, (
skip, ext = should_skip_file(item)
assert skip is expected_skip, (
f"{filename} with {service}: expected skip={expected_skip}"
)
if expected_skip:
assert ext is not None
else:
assert ext is None
def test_returns_unsupported_extension(mocker):
"""When a file is skipped due to unsupported extension, the ext string is returned."""
mocker.patch("app.config.config.ETL_SERVICE", "DOCLING")
item = {"name": "mail.eml", "file": {"mimeType": "application/octet-stream"}}
skip, ext = should_skip_file(item)
assert skip is True
assert ext == ".eml"