diff --git a/surfsense_backend/app/connectors/dropbox/file_types.py b/surfsense_backend/app/connectors/dropbox/file_types.py index 13209ffd2..d26306665 100644 --- a/surfsense_backend/app/connectors/dropbox/file_types.py +++ b/surfsense_backend/app/connectors/dropbox/file_types.py @@ -42,18 +42,25 @@ def is_paper_file(item: dict) -> bool: return ext == PAPER_EXTENSION -def should_skip_file(item: dict) -> bool: +def should_skip_file(item: dict) -> tuple[bool, str | None]: """Skip folders and truly non-indexable files. Paper docs are non-downloadable but exportable, so they are NOT skipped. + Returns (should_skip, unsupported_extension_or_None). """ if is_folder(item): - return True + return True, None if is_paper_file(item): - return False + return False, None if not item.get("is_downloadable", True): - return True + return True, None + + from pathlib import PurePosixPath + from app.config import config as app_config name = item.get("name", "") - return should_skip_for_service(name, app_config.ETL_SERVICE) + if should_skip_for_service(name, app_config.ETL_SERVICE): + ext = PurePosixPath(name).suffix.lower() + return True, ext + return False, None diff --git a/surfsense_backend/app/connectors/google_drive/file_types.py b/surfsense_backend/app/connectors/google_drive/file_types.py index 73f016ceb..75dc1d4b3 100644 --- a/surfsense_backend/app/connectors/google_drive/file_types.py +++ b/surfsense_backend/app/connectors/google_drive/file_types.py @@ -48,11 +48,19 @@ def should_skip_file(mime_type: str) -> bool: return mime_type in [GOOGLE_FOLDER, GOOGLE_SHORTCUT] -def should_skip_by_extension(filename: str) -> bool: - """Return True if the file extension is not parseable by the configured ETL service.""" +def should_skip_by_extension(filename: str) -> tuple[bool, str | None]: + """Check if the file extension is not parseable by the configured ETL service. + + Returns (should_skip, unsupported_extension_or_None). + """ + from pathlib import PurePosixPath + from app.config import config as app_config - return should_skip_for_service(filename, app_config.ETL_SERVICE) + if should_skip_for_service(filename, app_config.ETL_SERVICE): + ext = PurePosixPath(filename).suffix.lower() + return True, ext + return False, None def get_export_mime_type(mime_type: str) -> str | None: diff --git a/surfsense_backend/app/connectors/onedrive/file_types.py b/surfsense_backend/app/connectors/onedrive/file_types.py index f9c147da8..942b0be73 100644 --- a/surfsense_backend/app/connectors/onedrive/file_types.py +++ b/surfsense_backend/app/connectors/onedrive/file_types.py @@ -40,18 +40,28 @@ def is_folder(item: dict) -> bool: return ONEDRIVE_FOLDER_FACET in item -def should_skip_file(item: dict) -> bool: - """Skip folders, OneNote files, remote items (shared links), packages, and unsupported extensions.""" +def should_skip_file(item: dict) -> tuple[bool, str | None]: + """Skip folders, OneNote files, remote items, packages, and unsupported extensions. + + Returns (should_skip, unsupported_extension_or_None). + The second element is only set when the skip is due to an unsupported extension. + """ if is_folder(item): - return True + return True, None if "remoteItem" in item: - return True + return True, None if "package" in item: - return True + return True, None mime = item.get("file", {}).get("mimeType", "") if mime in SKIP_MIME_TYPES: - return True + return True, None + + from pathlib import PurePosixPath + from app.config import config as app_config name = item.get("name", "") - return should_skip_for_service(name, app_config.ETL_SERVICE) + if should_skip_for_service(name, app_config.ETL_SERVICE): + ext = PurePosixPath(name).suffix.lower() + return True, ext + return False, None diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index d208ff910..a30eb7297 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -2477,6 +2477,8 @@ async def run_google_drive_indexing( stage="fetching", ) + total_unsupported = 0 + # Index each folder with indexing options for folder in items.folders: try: @@ -2484,6 +2486,7 @@ async def run_google_drive_indexing( indexed_count, skipped_count, error_message, + unsupported_count, ) = await index_google_drive_files( session, connector_id, @@ -2497,6 +2500,7 @@ async def run_google_drive_indexing( include_subfolders=indexing_options.include_subfolders, ) total_skipped += skipped_count + total_unsupported += unsupported_count if error_message: errors.append(f"Folder '{folder.name}': {error_message}") else: @@ -2572,6 +2576,7 @@ async def run_google_drive_indexing( indexed_count=total_indexed, error_message=error_message, skipped_count=total_skipped, + unsupported_count=total_unsupported, ) except Exception as e: @@ -2642,7 +2647,7 @@ async def run_onedrive_indexing( stage="fetching", ) - total_indexed, total_skipped, error_message = await index_onedrive_files( + total_indexed, total_skipped, error_message, total_unsupported = await index_onedrive_files( session, connector_id, search_space_id, @@ -2683,6 +2688,7 @@ async def run_onedrive_indexing( indexed_count=total_indexed, error_message=error_message, skipped_count=total_skipped, + unsupported_count=total_unsupported, ) except Exception as e: @@ -2750,7 +2756,7 @@ async def run_dropbox_indexing( stage="fetching", ) - total_indexed, total_skipped, error_message = await index_dropbox_files( + total_indexed, total_skipped, error_message, total_unsupported = await index_dropbox_files( session, connector_id, search_space_id, @@ -2791,6 +2797,7 @@ async def run_dropbox_indexing( indexed_count=total_indexed, error_message=error_message, skipped_count=total_skipped, + unsupported_count=total_unsupported, ) except Exception as e: diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index 5e40a3b42..5ffee12d7 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -421,6 +421,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): error_message: str | None = None, is_warning: bool = False, skipped_count: int | None = None, + unsupported_count: int | None = None, ) -> Notification: """ Update notification when connector indexing completes. @@ -428,10 +429,11 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): Args: session: Database session notification: Notification to update - indexed_count: Total number of items indexed + indexed_count: Total number of files indexed error_message: Error message if indexing failed, or warning message (optional) is_warning: If True, treat error_message as a warning (success case) rather than an error - skipped_count: Number of items skipped (e.g., duplicates) - optional + skipped_count: Number of files skipped (e.g., unchanged) - optional + unsupported_count: Number of files skipped because the ETL parser doesn't support them Returns: Updated notification @@ -440,52 +442,45 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): "connector_name", "Connector" ) - # Build the skipped text if there are skipped items - skipped_text = "" - if skipped_count and skipped_count > 0: - skipped_item_text = "item" if skipped_count == 1 else "items" - skipped_text = ( - f" ({skipped_count} {skipped_item_text} skipped - already indexed)" - ) + unsupported_text = "" + if unsupported_count and unsupported_count > 0: + file_word = "file was" if unsupported_count == 1 else "files were" + unsupported_text = f" {unsupported_count} {file_word} not supported." - # If there's an error message but items were indexed, treat it as a warning (partial success) - # If is_warning is True, treat it as success even with 0 items (e.g., duplicates found) - # Otherwise, treat it as a failure if error_message: if indexed_count > 0: - # Partial success with warnings (e.g., duplicate content from other connectors) title = f"Ready: {connector_name}" - item_text = "item" if indexed_count == 1 else "items" - message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}. Note: {error_message}" + file_text = "file" if indexed_count == 1 else "files" + message = f"Now searchable! {indexed_count} {file_text} synced.{unsupported_text} Note: {error_message}" status = "completed" elif is_warning: - # Warning case (e.g., duplicates found) - treat as success title = f"Ready: {connector_name}" - message = f"Sync completed{skipped_text}. {error_message}" + message = f"Sync complete.{unsupported_text} {error_message}" status = "completed" else: - # Complete failure title = f"Failed: {connector_name}" message = f"Sync failed: {error_message}" + if unsupported_text: + message += unsupported_text status = "failed" else: title = f"Ready: {connector_name}" if indexed_count == 0: - if skipped_count and skipped_count > 0: - skipped_item_text = "item" if skipped_count == 1 else "items" - message = f"Already up to date! {skipped_count} {skipped_item_text} skipped (already indexed)." + if unsupported_count and unsupported_count > 0: + message = f"Sync complete.{unsupported_text}" else: - message = "Already up to date! No new items to sync." + message = "Already up to date!" else: - item_text = "item" if indexed_count == 1 else "items" - message = ( - f"Now searchable! {indexed_count} {item_text} synced{skipped_text}." - ) + file_text = "file" if indexed_count == 1 else "files" + message = f"Now searchable! {indexed_count} {file_text} synced." + if unsupported_text: + message += unsupported_text status = "completed" metadata_updates = { "indexed_count": indexed_count, "skipped_count": skipped_count or 0, + "unsupported_count": unsupported_count or 0, "sync_stage": "completed" if (not error_message or is_warning or indexed_count > 0) else "failed", diff --git a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py index d116cc264..9e7fe1cfb 100644 --- a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py @@ -51,7 +51,10 @@ async def _should_skip_file( file_id = file.get("id", "") file_name = file.get("name", "Unknown") - if skip_item(file): + skip, unsup_ext = skip_item(file) + if skip: + if unsup_ext: + return True, f"unsupported:{unsup_ext}" return True, "folder/non-downloadable" if not file_id: return True, "missing file_id" @@ -287,7 +290,7 @@ async def _index_with_delta_sync( max_files: int, on_heartbeat_callback: HeartbeatCallbackType | None = None, enable_summary: bool = True, -) -> tuple[int, int, str]: +) -> tuple[int, int, int, str]: """Delta sync using Dropbox cursor-based change tracking. Returns (indexed_count, skipped_count, new_cursor). @@ -309,12 +312,13 @@ async def _index_with_delta_sync( if not entries: logger.info("No changes detected since last sync") - return 0, 0, new_cursor or cursor + return 0, 0, 0, new_cursor or cursor logger.info(f"Processing {len(entries)} change entries") renamed_count = 0 skipped = 0 + unsupported_count = 0 files_to_download: list[dict] = [] files_processed = 0 @@ -339,7 +343,9 @@ async def _index_with_delta_sync( skip, msg = await _should_skip_file(session, entry, search_space_id) if skip: - if msg and "renamed" in msg.lower(): + if msg and msg.startswith("unsupported:"): + unsupported_count += 1 + elif msg and "renamed" in msg.lower(): renamed_count += 1 else: skipped += 1 @@ -360,9 +366,10 @@ async def _index_with_delta_sync( indexed = renamed_count + batch_indexed logger.info( - f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed" + f"Delta sync complete: {indexed} indexed, {skipped} skipped, " + f"{unsupported_count} unsupported, {failed} failed" ) - return indexed, skipped, new_cursor or cursor + return indexed, skipped, unsupported_count, new_cursor or cursor async def _index_full_scan( @@ -380,8 +387,11 @@ async def _index_full_scan( incremental_sync: bool = True, on_heartbeat_callback: HeartbeatCallbackType | None = None, enable_summary: bool = True, -) -> tuple[int, int]: - """Full scan indexing of a folder.""" +) -> tuple[int, int, int]: + """Full scan indexing of a folder. + + Returns (indexed, skipped, unsupported_count). + """ await task_logger.log_task_progress( log_entry, f"Starting full scan of folder: {folder_name}", @@ -401,6 +411,7 @@ async def _index_full_scan( renamed_count = 0 skipped = 0 + unsupported_count = 0 files_to_download: list[dict] = [] all_files, error = await get_files_in_folder( @@ -420,14 +431,21 @@ async def _index_full_scan( if incremental_sync: skip, msg = await _should_skip_file(session, file, search_space_id) if skip: - if msg and "renamed" in msg.lower(): + if msg and msg.startswith("unsupported:"): + unsupported_count += 1 + elif msg and "renamed" in msg.lower(): renamed_count += 1 else: skipped += 1 continue - elif skip_item(file): - skipped += 1 - continue + else: + item_skip, item_unsup = skip_item(file) + if item_skip: + if item_unsup: + unsupported_count += 1 + else: + skipped += 1 + continue file_pages = PageLimitService.estimate_pages_from_metadata( file.get("name", ""), file.get("size") @@ -466,9 +484,10 @@ async def _index_full_scan( indexed = renamed_count + batch_indexed logger.info( - f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed" + f"Full scan complete: {indexed} indexed, {skipped} skipped, " + f"{unsupported_count} unsupported, {failed} failed" ) - return indexed, skipped + return indexed, skipped, unsupported_count async def _index_selected_files( @@ -493,6 +512,7 @@ async def _index_selected_files( errors: list[str] = [] renamed_count = 0 skipped = 0 + unsupported_count = 0 for file_path, file_name in file_paths: file, error = await get_file_by_path(dropbox_client, file_path) @@ -504,14 +524,21 @@ async def _index_selected_files( if incremental_sync: skip, msg = await _should_skip_file(session, file, search_space_id) if skip: - if msg and "renamed" in msg.lower(): + if msg and msg.startswith("unsupported:"): + unsupported_count += 1 + elif msg and "renamed" in msg.lower(): renamed_count += 1 else: skipped += 1 continue - elif skip_item(file): - skipped += 1 - continue + else: + item_skip, item_unsup = skip_item(file) + if item_skip: + if item_unsup: + unsupported_count += 1 + else: + skipped += 1 + continue file_pages = PageLimitService.estimate_pages_from_metadata( file.get("name", ""), file.get("size") @@ -543,7 +570,7 @@ async def _index_selected_files( user_id, pages_to_deduct, allow_exceed=True ) - return renamed_count + batch_indexed, skipped, errors + return renamed_count + batch_indexed, skipped, unsupported_count, errors async def index_dropbox_files( @@ -552,7 +579,7 @@ async def index_dropbox_files( search_space_id: int, user_id: str, items_dict: dict, -) -> tuple[int, int, str | None]: +) -> tuple[int, int, str | None, int]: """Index Dropbox files for a specific connector. items_dict format: @@ -583,7 +610,7 @@ async def index_dropbox_files( await task_logger.log_task_failure( log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} ) - return 0, 0, error_msg + return 0, 0, error_msg, 0 token_encrypted = connector.config.get("_token_encrypted", False) if token_encrypted and not config.SECRET_KEY: @@ -594,7 +621,7 @@ async def index_dropbox_files( "Missing SECRET_KEY", {"error_type": "MissingSecretKey"}, ) - return 0, 0, error_msg + return 0, 0, error_msg, 0 connector_enable_summary = getattr(connector, "enable_summary", True) dropbox_client = DropboxClient(session, connector_id) @@ -609,6 +636,7 @@ async def index_dropbox_files( total_indexed = 0 total_skipped = 0 + total_unsupported = 0 selected_files = items_dict.get("files", []) if selected_files: @@ -616,7 +644,7 @@ async def index_dropbox_files( (f.get("path", f.get("path_lower", f.get("id", ""))), f.get("name")) for f in selected_files ] - indexed, skipped, file_errors = await _index_selected_files( + indexed, skipped, unsupported, file_errors = await _index_selected_files( dropbox_client, session, file_tuples, @@ -628,6 +656,7 @@ async def index_dropbox_files( ) total_indexed += indexed total_skipped += skipped + total_unsupported += unsupported if file_errors: logger.warning( f"File indexing errors for connector {connector_id}: {file_errors}" @@ -649,7 +678,7 @@ async def index_dropbox_files( if can_use_delta: logger.info(f"Using delta sync for folder {folder_name}") - indexed, skipped, new_cursor = await _index_with_delta_sync( + indexed, skipped, unsup, new_cursor = await _index_with_delta_sync( dropbox_client, session, connector_id, @@ -662,9 +691,10 @@ async def index_dropbox_files( enable_summary=connector_enable_summary, ) folder_cursors[folder_path] = new_cursor + total_unsupported += unsup else: logger.info(f"Using full scan for folder {folder_name}") - indexed, skipped = await _index_full_scan( + indexed, skipped, unsup = await _index_full_scan( dropbox_client, session, connector_id, @@ -679,6 +709,7 @@ async def index_dropbox_files( incremental_sync=incremental_sync, enable_summary=connector_enable_summary, ) + total_unsupported += unsup total_indexed += indexed total_skipped += skipped @@ -708,12 +739,14 @@ async def index_dropbox_files( await task_logger.log_task_success( log_entry, f"Successfully completed Dropbox indexing for connector {connector_id}", - {"files_processed": total_indexed, "files_skipped": total_skipped}, + {"files_processed": total_indexed, "files_skipped": total_skipped, "files_unsupported": total_unsupported}, ) logger.info( - f"Dropbox indexing completed: {total_indexed} indexed, {total_skipped} skipped" + f"Dropbox indexing completed: {total_indexed} indexed, " + f"{total_skipped} skipped, {total_unsupported} unsupported" ) - return total_indexed, total_skipped, None + + return total_indexed, total_skipped, None, total_unsupported except SQLAlchemyError as db_error: await session.rollback() @@ -724,7 +757,7 @@ async def index_dropbox_files( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}", 0 except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -734,4 +767,4 @@ async def index_dropbox_files( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Dropbox files: {e!s}", exc_info=True) - return 0, 0, f"Failed to index Dropbox files: {e!s}" + return 0, 0, f"Failed to index Dropbox files: {e!s}", 0 diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 9c53092f5..b2afbb9c9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -81,8 +81,9 @@ async def _should_skip_file( if skip_mime(mime_type): return True, "folder/shortcut" - if should_skip_by_extension(file_name): - return True, "unsupported extension" + ext_skip, unsup_ext = should_skip_by_extension(file_name) + if ext_skip: + return True, f"unsupported:{unsup_ext}" if not file_id: return True, "missing file_id" @@ -490,6 +491,7 @@ async def _index_selected_files( errors: list[str] = [] renamed_count = 0 skipped = 0 + unsupported_count = 0 for file_id, file_name in file_ids: file, error = await get_file_by_id(drive_client, file_id) @@ -500,7 +502,9 @@ async def _index_selected_files( skip, msg = await _should_skip_file(session, file, search_space_id) if skip: - if msg and "renamed" in msg.lower(): + if msg and msg.startswith("unsupported:"): + unsupported_count += 1 + elif msg and "renamed" in msg.lower(): renamed_count += 1 else: skipped += 1 @@ -544,7 +548,7 @@ async def _index_selected_files( user_id, pages_to_deduct, allow_exceed=True ) - return renamed_count + batch_indexed, skipped, errors + return renamed_count + batch_indexed, skipped, unsupported_count, errors # --------------------------------------------------------------------------- @@ -567,8 +571,11 @@ async def _index_full_scan( include_subfolders: bool = False, on_heartbeat_callback: HeartbeatCallbackType | None = None, enable_summary: bool = True, -) -> tuple[int, int]: - """Full scan indexing of a folder.""" +) -> tuple[int, int, int]: + """Full scan indexing of a folder. + + Returns (indexed, skipped, unsupported_count). + """ await task_logger.log_task_progress( log_entry, f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})", @@ -590,6 +597,7 @@ async def _index_full_scan( renamed_count = 0 skipped = 0 + unsupported_count = 0 files_processed = 0 files_to_download: list[dict] = [] folders_to_process = [(folder_id, folder_name)] @@ -630,7 +638,9 @@ async def _index_full_scan( skip, msg = await _should_skip_file(session, file, search_space_id) if skip: - if msg and "renamed" in msg.lower(): + if msg and msg.startswith("unsupported:"): + unsupported_count += 1 + elif msg and "renamed" in msg.lower(): renamed_count += 1 else: skipped += 1 @@ -703,9 +713,10 @@ async def _index_full_scan( indexed = renamed_count + batch_indexed logger.info( - f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed" + f"Full scan complete: {indexed} indexed, {skipped} skipped, " + f"{unsupported_count} unsupported, {failed} failed" ) - return indexed, skipped + return indexed, skipped, unsupported_count async def _index_with_delta_sync( @@ -723,8 +734,11 @@ async def _index_with_delta_sync( include_subfolders: bool = False, on_heartbeat_callback: HeartbeatCallbackType | None = None, enable_summary: bool = True, -) -> tuple[int, int]: - """Delta sync using change tracking.""" +) -> tuple[int, int, int]: + """Delta sync using change tracking. + + Returns (indexed, skipped, unsupported_count). + """ await task_logger.log_task_progress( log_entry, f"Starting delta sync from token: {start_page_token[:20]}...", @@ -759,6 +773,7 @@ async def _index_with_delta_sync( renamed_count = 0 skipped = 0 + unsupported_count = 0 files_to_download: list[dict] = [] files_processed = 0 @@ -780,7 +795,9 @@ async def _index_with_delta_sync( skip, msg = await _should_skip_file(session, file, search_space_id) if skip: - if msg and "renamed" in msg.lower(): + if msg and msg.startswith("unsupported:"): + unsupported_count += 1 + elif msg and "renamed" in msg.lower(): renamed_count += 1 else: skipped += 1 @@ -837,9 +854,10 @@ async def _index_with_delta_sync( indexed = renamed_count + batch_indexed logger.info( - f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed" + f"Delta sync complete: {indexed} indexed, {skipped} skipped, " + f"{unsupported_count} unsupported, {failed} failed" ) - return indexed, skipped + return indexed, skipped, unsupported_count # --------------------------------------------------------------------------- @@ -859,8 +877,11 @@ async def index_google_drive_files( max_files: int = 500, include_subfolders: bool = False, on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, int, str | None]: - """Index Google Drive files for a specific connector.""" +) -> tuple[int, int, str | None, int]: + """Index Google Drive files for a specific connector. + + Returns (indexed, skipped, error_or_none, unsupported_count). + """ task_logger = TaskLoggingService(session, search_space_id) log_entry = await task_logger.log_task_start( task_name="google_drive_files_indexing", @@ -886,7 +907,7 @@ async def index_google_drive_files( await task_logger.log_task_failure( log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} ) - return 0, 0, error_msg + return 0, 0, error_msg, 0 await task_logger.log_task_progress( log_entry, @@ -905,7 +926,7 @@ async def index_google_drive_files( "Missing Composio account", {"error_type": "MissingComposioAccount"}, ) - return 0, 0, error_msg + return 0, 0, error_msg, 0 pre_built_credentials = build_composio_credentials(connected_account_id) else: token_encrypted = connector.config.get("_token_encrypted", False) @@ -920,6 +941,7 @@ async def index_google_drive_files( 0, 0, "SECRET_KEY not configured but credentials are marked as encrypted", + 0, ) connector_enable_summary = getattr(connector, "enable_summary", True) @@ -932,7 +954,7 @@ async def index_google_drive_files( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "MissingParameter"} ) - return 0, 0, error_msg + return 0, 0, error_msg, 0 target_folder_id = folder_id target_folder_name = folder_name or "Selected Folder" @@ -943,9 +965,11 @@ async def index_google_drive_files( use_delta_sync and start_page_token and connector.last_indexed_at ) + documents_unsupported = 0 + if can_use_delta: logger.info(f"Using delta sync for connector {connector_id}") - documents_indexed, documents_skipped = await _index_with_delta_sync( + documents_indexed, documents_skipped, du = await _index_with_delta_sync( drive_client, session, connector, @@ -961,8 +985,9 @@ async def index_google_drive_files( on_heartbeat_callback, connector_enable_summary, ) + documents_unsupported += du logger.info("Running reconciliation scan after delta sync") - ri, rs = await _index_full_scan( + ri, rs, ru = await _index_full_scan( drive_client, session, connector, @@ -980,9 +1005,10 @@ async def index_google_drive_files( ) documents_indexed += ri documents_skipped += rs + documents_unsupported += ru else: logger.info(f"Using full scan for connector {connector_id}") - documents_indexed, documents_skipped = await _index_full_scan( + documents_indexed, documents_skipped, documents_unsupported = await _index_full_scan( drive_client, session, connector, @@ -1017,14 +1043,17 @@ async def index_google_drive_files( { "files_processed": documents_indexed, "files_skipped": documents_skipped, + "files_unsupported": documents_unsupported, "sync_type": "delta" if can_use_delta else "full", "folder": target_folder_name, }, ) logger.info( - f"Google Drive indexing completed: {documents_indexed} indexed, {documents_skipped} skipped" + f"Google Drive indexing completed: {documents_indexed} indexed, " + f"{documents_skipped} skipped, {documents_unsupported} unsupported" ) - return documents_indexed, documents_skipped, None + + return documents_indexed, documents_skipped, None, documents_unsupported except SQLAlchemyError as db_error: await session.rollback() @@ -1035,7 +1064,7 @@ async def index_google_drive_files( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}", 0 except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -1045,7 +1074,7 @@ async def index_google_drive_files( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Google Drive files: {e!s}", exc_info=True) - return 0, 0, f"Failed to index Google Drive files: {e!s}" + return 0, 0, f"Failed to index Google Drive files: {e!s}", 0 async def index_google_drive_single_file( @@ -1247,7 +1276,7 @@ async def index_google_drive_selected_files( session, connector_id, credentials=pre_built_credentials ) - indexed, skipped, errors = await _index_selected_files( + indexed, skipped, unsupported, errors = await _index_selected_files( drive_client, session, files, @@ -1258,6 +1287,11 @@ async def index_google_drive_selected_files( on_heartbeat=on_heartbeat_callback, ) + if unsupported > 0: + file_text = "file was" if unsupported == 1 else "files were" + unsup_msg = f"{unsupported} {file_text} not supported" + errors.append(unsup_msg) + await session.commit() if errors: @@ -1265,7 +1299,7 @@ async def index_google_drive_selected_files( log_entry, f"Batch file indexing completed with {len(errors)} error(s)", "; ".join(errors), - {"indexed": indexed, "skipped": skipped, "error_count": len(errors)}, + {"indexed": indexed, "skipped": skipped, "unsupported": unsupported, "error_count": len(errors)}, ) else: await task_logger.log_task_success( diff --git a/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py index 2301b6260..db42773fe 100644 --- a/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py @@ -56,7 +56,10 @@ async def _should_skip_file( file_id = file.get("id") file_name = file.get("name", "Unknown") - if skip_item(file): + skip, unsup_ext = skip_item(file) + if skip: + if unsup_ext: + return True, f"unsupported:{unsup_ext}" return True, "folder/onenote/remote" if not file_id: return True, "missing file_id" @@ -301,6 +304,7 @@ async def _index_selected_files( errors: list[str] = [] renamed_count = 0 skipped = 0 + unsupported_count = 0 for file_id, file_name in file_ids: file, error = await get_file_by_id(onedrive_client, file_id) @@ -311,7 +315,9 @@ async def _index_selected_files( skip, msg = await _should_skip_file(session, file, search_space_id) if skip: - if msg and "renamed" in msg.lower(): + if msg and msg.startswith("unsupported:"): + unsupported_count += 1 + elif msg and "renamed" in msg.lower(): renamed_count += 1 else: skipped += 1 @@ -347,7 +353,7 @@ async def _index_selected_files( user_id, pages_to_deduct, allow_exceed=True ) - return renamed_count + batch_indexed, skipped, errors + return renamed_count + batch_indexed, skipped, unsupported_count, errors # --------------------------------------------------------------------------- @@ -369,8 +375,11 @@ async def _index_full_scan( include_subfolders: bool = True, on_heartbeat_callback: HeartbeatCallbackType | None = None, enable_summary: bool = True, -) -> tuple[int, int]: - """Full scan indexing of a folder.""" +) -> tuple[int, int, int]: + """Full scan indexing of a folder. + + Returns (indexed, skipped, unsupported_count). + """ await task_logger.log_task_progress( log_entry, f"Starting full scan of folder: {folder_name}", @@ -389,6 +398,7 @@ async def _index_full_scan( renamed_count = 0 skipped = 0 + unsupported_count = 0 files_to_download: list[dict] = [] all_files, error = await get_files_in_folder( @@ -407,7 +417,9 @@ async def _index_full_scan( for file in all_files[:max_files]: skip, msg = await _should_skip_file(session, file, search_space_id) if skip: - if msg and "renamed" in msg.lower(): + if msg and msg.startswith("unsupported:"): + unsupported_count += 1 + elif msg and "renamed" in msg.lower(): renamed_count += 1 else: skipped += 1 @@ -450,9 +462,10 @@ async def _index_full_scan( indexed = renamed_count + batch_indexed logger.info( - f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed" + f"Full scan complete: {indexed} indexed, {skipped} skipped, " + f"{unsupported_count} unsupported, {failed} failed" ) - return indexed, skipped + return indexed, skipped, unsupported_count async def _index_with_delta_sync( @@ -468,8 +481,11 @@ async def _index_with_delta_sync( max_files: int, on_heartbeat_callback: HeartbeatCallbackType | None = None, enable_summary: bool = True, -) -> tuple[int, int, str | None]: - """Delta sync using OneDrive change tracking. Returns (indexed, skipped, new_delta_link).""" +) -> tuple[int, int, int, str | None]: + """Delta sync using OneDrive change tracking. + + Returns (indexed, skipped, unsupported_count, new_delta_link). + """ await task_logger.log_task_progress( log_entry, "Starting delta sync", @@ -489,7 +505,7 @@ async def _index_with_delta_sync( if not changes: logger.info("No changes detected since last sync") - return 0, 0, new_delta_link + return 0, 0, 0, new_delta_link logger.info(f"Processing {len(changes)} delta changes") @@ -501,6 +517,7 @@ async def _index_with_delta_sync( renamed_count = 0 skipped = 0 + unsupported_count = 0 files_to_download: list[dict] = [] files_processed = 0 @@ -523,7 +540,9 @@ async def _index_with_delta_sync( skip, msg = await _should_skip_file(session, change, search_space_id) if skip: - if msg and "renamed" in msg.lower(): + if msg and msg.startswith("unsupported:"): + unsupported_count += 1 + elif msg and "renamed" in msg.lower(): renamed_count += 1 else: skipped += 1 @@ -566,9 +585,10 @@ async def _index_with_delta_sync( indexed = renamed_count + batch_indexed logger.info( - f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed" + f"Delta sync complete: {indexed} indexed, {skipped} skipped, " + f"{unsupported_count} unsupported, {failed} failed" ) - return indexed, skipped, new_delta_link + return indexed, skipped, unsupported_count, new_delta_link # --------------------------------------------------------------------------- @@ -582,7 +602,7 @@ async def index_onedrive_files( search_space_id: int, user_id: str, items_dict: dict, -) -> tuple[int, int, str | None]: +) -> tuple[int, int, str | None, int]: """Index OneDrive files for a specific connector. items_dict format: @@ -609,7 +629,7 @@ async def index_onedrive_files( await task_logger.log_task_failure( log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} ) - return 0, 0, error_msg + return 0, 0, error_msg, 0 token_encrypted = connector.config.get("_token_encrypted", False) if token_encrypted and not config.SECRET_KEY: @@ -620,7 +640,7 @@ async def index_onedrive_files( "Missing SECRET_KEY", {"error_type": "MissingSecretKey"}, ) - return 0, 0, error_msg + return 0, 0, error_msg, 0 connector_enable_summary = getattr(connector, "enable_summary", True) onedrive_client = OneDriveClient(session, connector_id) @@ -632,12 +652,13 @@ async def index_onedrive_files( total_indexed = 0 total_skipped = 0 + total_unsupported = 0 # Index selected individual files selected_files = items_dict.get("files", []) if selected_files: file_tuples = [(f["id"], f.get("name")) for f in selected_files] - indexed, skipped, _errors = await _index_selected_files( + indexed, skipped, unsupported, _errors = await _index_selected_files( onedrive_client, session, file_tuples, @@ -648,6 +669,7 @@ async def index_onedrive_files( ) total_indexed += indexed total_skipped += skipped + total_unsupported += unsupported # Index selected folders folders = items_dict.get("folders", []) @@ -661,7 +683,7 @@ async def index_onedrive_files( if can_use_delta: logger.info(f"Using delta sync for folder {folder_name}") - indexed, skipped, new_delta_link = await _index_with_delta_sync( + indexed, skipped, unsup, new_delta_link = await _index_with_delta_sync( onedrive_client, session, connector_id, @@ -676,6 +698,7 @@ async def index_onedrive_files( ) total_indexed += indexed total_skipped += skipped + total_unsupported += unsup if new_delta_link: await session.refresh(connector) @@ -685,7 +708,7 @@ async def index_onedrive_files( flag_modified(connector, "config") # Reconciliation full scan - ri, rs = await _index_full_scan( + ri, rs, ru = await _index_full_scan( onedrive_client, session, connector_id, @@ -701,9 +724,10 @@ async def index_onedrive_files( ) total_indexed += ri total_skipped += rs + total_unsupported += ru else: logger.info(f"Using full scan for folder {folder_name}") - indexed, skipped = await _index_full_scan( + indexed, skipped, unsup = await _index_full_scan( onedrive_client, session, connector_id, @@ -719,6 +743,7 @@ async def index_onedrive_files( ) total_indexed += indexed total_skipped += skipped + total_unsupported += unsup # Store new delta link for this folder _, new_delta_link, _ = await onedrive_client.get_delta(folder_id=folder_id) @@ -737,12 +762,14 @@ async def index_onedrive_files( await task_logger.log_task_success( log_entry, f"Successfully completed OneDrive indexing for connector {connector_id}", - {"files_processed": total_indexed, "files_skipped": total_skipped}, + {"files_processed": total_indexed, "files_skipped": total_skipped, "files_unsupported": total_unsupported}, ) logger.info( - f"OneDrive indexing completed: {total_indexed} indexed, {total_skipped} skipped" + f"OneDrive indexing completed: {total_indexed} indexed, " + f"{total_skipped} skipped, {total_unsupported} unsupported" ) - return total_indexed, total_skipped, None + + return total_indexed, total_skipped, None, total_unsupported except SQLAlchemyError as db_error: await session.rollback() @@ -753,7 +780,7 @@ async def index_onedrive_files( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}", 0 except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -763,4 +790,4 @@ async def index_onedrive_files( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index OneDrive files: {e!s}", exc_info=True) - return 0, 0, f"Failed to index OneDrive files: {e!s}" + return 0, 0, f"Failed to index OneDrive files: {e!s}", 0 diff --git a/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py index 8572fa8ea..14c16fce4 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py @@ -265,7 +265,10 @@ def full_scan_mocks(mock_dropbox_client, monkeypatch): async def _fake_skip(session, file, search_space_id): from app.connectors.dropbox.file_types import should_skip_file as _skip - if _skip(file): + item_skip, unsup_ext = _skip(file) + if item_skip: + if unsup_ext: + return True, f"unsupported:{unsup_ext}" return True, "folder/non-downloadable" return skip_results.get(file.get("id", ""), (False, None)) @@ -541,7 +544,7 @@ async def test_delta_sync_deletions_call_remove_document(monkeypatch): mock_task_logger = MagicMock() mock_task_logger.log_task_progress = AsyncMock() - indexed, skipped, cursor = await _index_with_delta_sync( + indexed, skipped, unsupported, cursor = await _index_with_delta_sync( mock_client, AsyncMock(), _CONNECTOR_ID, @@ -578,7 +581,7 @@ async def test_delta_sync_upserts_filtered_and_downloaded(monkeypatch): mock_task_logger = MagicMock() mock_task_logger.log_task_progress = AsyncMock() - indexed, skipped, cursor = await _index_with_delta_sync( + indexed, skipped, unsupported, cursor = await _index_with_delta_sync( mock_client, AsyncMock(), _CONNECTOR_ID, @@ -628,7 +631,7 @@ async def test_delta_sync_mix_deletions_and_upserts(monkeypatch): mock_task_logger = MagicMock() mock_task_logger.log_task_progress = AsyncMock() - indexed, skipped, cursor = await _index_with_delta_sync( + indexed, skipped, unsupported, cursor = await _index_with_delta_sync( mock_client, AsyncMock(), _CONNECTOR_ID, @@ -662,7 +665,7 @@ async def test_delta_sync_returns_new_cursor(monkeypatch): mock_task_logger = MagicMock() mock_task_logger.log_task_progress = AsyncMock() - indexed, skipped, cursor = await _index_with_delta_sync( + indexed, skipped, unsupported, cursor = await _index_with_delta_sync( mock_client, AsyncMock(), _CONNECTOR_ID, diff --git a/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py index 20bd3f3d6..b830e9773 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py @@ -497,7 +497,7 @@ async def test_delta_sync_removals_serial_rest_parallel(monkeypatch): mock_task_logger = MagicMock() mock_task_logger.log_task_progress = AsyncMock() - indexed, skipped = await _index_with_delta_sync( + indexed, skipped, unsupported = await _index_with_delta_sync( MagicMock(), mock_session, MagicMock(), diff --git a/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py b/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py index b31a9557f..e2996ce9d 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py @@ -384,7 +384,7 @@ async def test_gdrive_full_scan_skips_over_quota(gdrive_full_scan_mocks, monkeyp m["download_mock"].return_value = ([], 0) m["batch_mock"].return_value = ([], 2, 0) - _indexed, skipped = await _run_gdrive_full_scan(m) + _indexed, skipped, _unsup = await _run_gdrive_full_scan(m) call_files = m["download_mock"].call_args[0][1] assert len(call_files) == 2 @@ -459,7 +459,7 @@ async def test_gdrive_delta_sync_skips_over_quota(monkeypatch): mock_task_logger = MagicMock() mock_task_logger.log_task_progress = AsyncMock() - _indexed, skipped = await _mod._index_with_delta_sync( + _indexed, skipped, _unsupported = await _mod._index_with_delta_sync( MagicMock(), session, MagicMock(), diff --git a/surfsense_backend/tests/unit/connectors/test_dropbox_file_types.py b/surfsense_backend/tests/unit/connectors/test_dropbox_file_types.py index e092872c5..74277d47c 100644 --- a/surfsense_backend/tests/unit/connectors/test_dropbox_file_types.py +++ b/surfsense_backend/tests/unit/connectors/test_dropbox_file_types.py @@ -14,17 +14,23 @@ pytestmark = pytest.mark.unit def test_folder_item_is_skipped(): item = {".tag": "folder", "name": "My Folder"} - assert should_skip_file(item) is True + skip, ext = should_skip_file(item) + assert skip is True + assert ext is None def test_paper_file_is_not_skipped(): item = {".tag": "file", "name": "notes.paper", "is_downloadable": False} - assert should_skip_file(item) is False + skip, ext = should_skip_file(item) + assert skip is False + assert ext is None def test_non_downloadable_item_is_skipped(): item = {".tag": "file", "name": "locked.gdoc", "is_downloadable": False} - assert should_skip_file(item) is True + skip, ext = should_skip_file(item) + assert skip is True + assert ext is None # --------------------------------------------------------------------------- @@ -49,7 +55,9 @@ def test_non_downloadable_item_is_skipped(): def test_non_parseable_extensions_are_skipped(filename, mocker): mocker.patch("app.config.config.ETL_SERVICE", "DOCLING") item = {".tag": "file", "name": filename} - assert should_skip_file(item) is True, f"{filename} should be skipped" + skip, ext = should_skip_file(item) + assert skip is True, f"{filename} should be skipped" + assert ext is not None @pytest.mark.parametrize( @@ -65,9 +73,9 @@ def test_parseable_documents_are_not_skipped(filename, mocker): for service in ("DOCLING", "LLAMACLOUD", "UNSTRUCTURED"): mocker.patch("app.config.config.ETL_SERVICE", service) item = {".tag": "file", "name": filename} - assert should_skip_file(item) is False, ( - f"{filename} should NOT be skipped with {service}" - ) + skip, ext = should_skip_file(item) + assert skip is False, f"{filename} should NOT be skipped with {service}" + assert ext is None @pytest.mark.parametrize( @@ -79,9 +87,9 @@ def test_universal_images_are_not_skipped(filename, mocker): for service in ("DOCLING", "LLAMACLOUD", "UNSTRUCTURED"): mocker.patch("app.config.config.ETL_SERVICE", service) item = {".tag": "file", "name": filename} - assert should_skip_file(item) is False, ( - f"{filename} should NOT be skipped with {service}" - ) + skip, ext = should_skip_file(item) + assert skip is False, f"{filename} should NOT be skipped with {service}" + assert ext is None @pytest.mark.parametrize("filename,service,expected_skip", [ @@ -111,6 +119,20 @@ def test_universal_images_are_not_skipped(filename, mocker): def test_parser_specific_extensions(filename, service, expected_skip, mocker): mocker.patch("app.config.config.ETL_SERVICE", service) item = {".tag": "file", "name": filename} - assert should_skip_file(item) is expected_skip, ( + skip, ext = should_skip_file(item) + assert skip is expected_skip, ( f"{filename} with {service}: expected skip={expected_skip}" ) + if expected_skip: + assert ext is not None + else: + assert ext is None + + +def test_returns_unsupported_extension(mocker): + """When a file is skipped due to unsupported extension, the ext string is returned.""" + mocker.patch("app.config.config.ETL_SERVICE", "DOCLING") + item = {".tag": "file", "name": "old.doc"} + skip, ext = should_skip_file(item) + assert skip is True + assert ext == ".doc" diff --git a/surfsense_backend/tests/unit/connectors/test_google_drive_file_types.py b/surfsense_backend/tests/unit/connectors/test_google_drive_file_types.py index 4ed7eb4db..5cd43736b 100644 --- a/surfsense_backend/tests/unit/connectors/test_google_drive_file_types.py +++ b/surfsense_backend/tests/unit/connectors/test_google_drive_file_types.py @@ -14,7 +14,8 @@ def test_unsupported_extensions_are_skipped_regardless_of_service(filename, mock """Truly unsupported files are skipped no matter which ETL service is configured.""" for service in ("DOCLING", "LLAMACLOUD", "UNSTRUCTURED"): mocker.patch("app.config.config.ETL_SERVICE", service) - assert should_skip_by_extension(filename) is True + skip, ext = should_skip_by_extension(filename) + assert skip is True @pytest.mark.parametrize("filename", [ @@ -25,9 +26,9 @@ def test_universal_extensions_are_not_skipped(filename, mocker): """Files supported by all parsers (or handled by plaintext/direct_convert) are never skipped.""" for service in ("DOCLING", "LLAMACLOUD", "UNSTRUCTURED"): mocker.patch("app.config.config.ETL_SERVICE", service) - assert should_skip_by_extension(filename) is False, ( - f"{filename} should NOT be skipped with {service}" - ) + skip, ext = should_skip_by_extension(filename) + assert skip is False, f"{filename} should NOT be skipped with {service}" + assert ext is None @pytest.mark.parametrize("filename,service,expected_skip", [ @@ -42,6 +43,19 @@ def test_universal_extensions_are_not_skipped(filename, mocker): ]) def test_parser_specific_extensions(filename, service, expected_skip, mocker): mocker.patch("app.config.config.ETL_SERVICE", service) - assert should_skip_by_extension(filename) is expected_skip, ( + skip, ext = should_skip_by_extension(filename) + assert skip is expected_skip, ( f"{filename} with {service}: expected skip={expected_skip}" ) + if expected_skip: + assert ext is not None, "unsupported extension should be returned" + else: + assert ext is None + + +def test_returns_unsupported_extension(mocker): + """When a file is skipped, the unsupported extension string is returned.""" + mocker.patch("app.config.config.ETL_SERVICE", "DOCLING") + skip, ext = should_skip_by_extension("macro.docm") + assert skip is True + assert ext == ".docm" diff --git a/surfsense_backend/tests/unit/connectors/test_onedrive_file_types.py b/surfsense_backend/tests/unit/connectors/test_onedrive_file_types.py index e73f799e2..61212b340 100644 --- a/surfsense_backend/tests/unit/connectors/test_onedrive_file_types.py +++ b/surfsense_backend/tests/unit/connectors/test_onedrive_file_types.py @@ -14,22 +14,30 @@ pytestmark = pytest.mark.unit def test_folder_is_skipped(): item = {"folder": {}, "name": "My Folder"} - assert should_skip_file(item) is True + skip, ext = should_skip_file(item) + assert skip is True + assert ext is None def test_remote_item_is_skipped(): item = {"remoteItem": {}, "name": "shared.docx"} - assert should_skip_file(item) is True + skip, ext = should_skip_file(item) + assert skip is True + assert ext is None def test_package_is_skipped(): item = {"package": {}, "name": "notebook"} - assert should_skip_file(item) is True + skip, ext = should_skip_file(item) + assert skip is True + assert ext is None def test_onenote_is_skipped(): item = {"name": "notes", "file": {"mimeType": "application/msonenote"}} - assert should_skip_file(item) is True + skip, ext = should_skip_file(item) + assert skip is True + assert ext is None # --------------------------------------------------------------------------- @@ -43,7 +51,9 @@ def test_onenote_is_skipped(): def test_unsupported_extensions_are_skipped(filename, mocker): mocker.patch("app.config.config.ETL_SERVICE", "DOCLING") item = {"name": filename, "file": {"mimeType": "application/octet-stream"}} - assert should_skip_file(item) is True, f"{filename} should be skipped" + skip, ext = should_skip_file(item) + assert skip is True, f"{filename} should be skipped" + assert ext is not None @pytest.mark.parametrize("filename", [ @@ -54,9 +64,9 @@ def test_universal_files_are_not_skipped(filename, mocker): for service in ("DOCLING", "LLAMACLOUD", "UNSTRUCTURED"): mocker.patch("app.config.config.ETL_SERVICE", service) item = {"name": filename, "file": {"mimeType": "application/octet-stream"}} - assert should_skip_file(item) is False, ( - f"{filename} should NOT be skipped with {service}" - ) + skip, ext = should_skip_file(item) + assert skip is False, f"{filename} should NOT be skipped with {service}" + assert ext is None @pytest.mark.parametrize("filename,service,expected_skip", [ @@ -70,6 +80,20 @@ def test_universal_files_are_not_skipped(filename, mocker): def test_parser_specific_extensions(filename, service, expected_skip, mocker): mocker.patch("app.config.config.ETL_SERVICE", service) item = {"name": filename, "file": {"mimeType": "application/octet-stream"}} - assert should_skip_file(item) is expected_skip, ( + skip, ext = should_skip_file(item) + assert skip is expected_skip, ( f"{filename} with {service}: expected skip={expected_skip}" ) + if expected_skip: + assert ext is not None + else: + assert ext is None + + +def test_returns_unsupported_extension(mocker): + """When a file is skipped due to unsupported extension, the ext string is returned.""" + mocker.patch("app.config.config.ETL_SERVICE", "DOCLING") + item = {"name": "mail.eml", "file": {"mimeType": "application/octet-stream"}} + skip, ext = should_skip_file(item) + assert skip is True + assert ext == ".eml"