diff --git a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py index 3d4ddc19e..a3281eaea 100644 --- a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py @@ -14,7 +14,6 @@ no connector row is read. """ import os -import time from collections.abc import Awaitable, Callable from datetime import UTC, datetime from pathlib import Path @@ -30,24 +29,16 @@ from app.db import ( DocumentType, Folder, ) +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) from app.utils.document_versioning import create_version_snapshot from .base import ( - build_document_metadata_string, check_document_by_unique_identifier, - check_duplicate_document_by_hash, - get_current_timestamp, logger, - safe_set_chunks, ) PLAINTEXT_EXTENSIONS = frozenset({ @@ -89,7 +80,6 @@ def _needs_etl(filename: str) -> bool: return not _is_plaintext_file(filename) and not _is_audio_file(filename) HeartbeatCallbackType = Callable[[int], Awaitable[None]] -HEARTBEAT_INTERVAL_SECONDS = 30 DEFAULT_EXCLUDE_PATTERNS = [ ".git", @@ -210,6 +200,16 @@ async def _read_file_content(file_path: str, filename: str) -> str: return await _parse_file_to_markdown(file_path, filename) +def _content_hash(content: str, search_space_id: int) -> str: + """SHA-256 hash of content scoped to a search space. + + Matches the format used by ``compute_content_hash`` in the unified + pipeline so that dedup checks are consistent. + """ + import hashlib + return hashlib.sha256(f"{search_space_id}:{content}".encode("utf-8")).hexdigest() + + async def _compute_file_content_hash( file_path: str, filename: str, search_space_id: int, ) -> tuple[str, str]: @@ -218,8 +218,7 @@ async def _compute_file_content_hash( Returns (content_text, content_hash). """ content = await _read_file_content(file_path, filename) - content_hash = generate_content_hash(content, search_space_id) - return content, content_hash + return content, _content_hash(content, search_space_id) async def _mirror_folder_structure( @@ -454,6 +453,40 @@ async def _cleanup_empty_folders( candidates = remaining +def _build_connector_doc( + title: str, + content: str, + relative_path: str, + folder_name: str, + *, + search_space_id: int, + user_id: str, + enable_summary: bool, +) -> ConnectorDocument: + """Build a ConnectorDocument from a local file's extracted content.""" + unique_id = f"{folder_name}:{relative_path}" + metadata = { + "folder_name": folder_name, + "file_path": relative_path, + "document_type": "Local Folder File", + "connector_type": "Local Folder", + } + fallback_summary = f"File: {title}\n\n{content[:4000]}" + + return ConnectorDocument( + title=title, + source_markdown=content, + unique_id=unique_id, + document_type=DocumentType.LOCAL_FOLDER_FILE, + search_space_id=search_space_id, + connector_id=None, + created_by_id=user_id, + should_summarize=enable_summary, + fallback_summary=fallback_summary, + metadata=metadata, + ) + + async def index_local_folder( session: AsyncSession, search_space_id: int, @@ -551,15 +584,13 @@ async def index_local_folder( indexed_count = 0 skipped_count = 0 failed_count = 0 - duplicate_count = 0 - - last_heartbeat_time = time.time() # ================================================================ - # PHASE 1: Analyze all files, create pending documents + # PHASE 1: Pre-filter files (mtime / content-hash), version changed # ================================================================ - files_to_process: list[dict] = [] - new_documents_created = False + connector_docs: list[ConnectorDocument] = [] + # Maps unique_id -> (relative_path, mtime) for post-pipeline folder_id assignment + file_meta_map: dict[str, dict] = {} seen_unique_hashes: set[str] = set() for file_info in files: @@ -568,8 +599,8 @@ async def index_local_folder( file_path_abs = file_info["path"] unique_identifier = f"{folder_name}:{relative_path}" - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.LOCAL_FOLDER_FILE, + unique_identifier_hash = compute_identifier_hash( + DocumentType.LOCAL_FOLDER_FILE.value, unique_identifier, search_space_id, ) @@ -612,94 +643,42 @@ async def index_local_folder( continue await create_version_snapshot(session, existing_document) + else: + try: + content, content_hash = await _compute_file_content_hash( + file_path_abs, file_info["relative_path"], search_space_id + ) + except Exception as read_err: + logger.warning(f"Could not read {file_path_abs}: {read_err}") + skipped_count += 1 + continue - files_to_process.append( - { - "document": existing_document, - "is_new": False, - "file_info": file_info, - "content": content, - "content_hash": content_hash, - "unique_identifier_hash": unique_identifier_hash, - "relative_path": relative_path, - "title": file_info["name"], - } - ) - continue + if not content.strip(): + skipped_count += 1 + continue - try: - content, content_hash = await _compute_file_content_hash( - file_path_abs, file_info["relative_path"], search_space_id - ) - except Exception as read_err: - logger.warning(f"Could not read {file_path_abs}: {read_err}") - skipped_count += 1 - continue - - if not content.strip(): - skipped_count += 1 - continue - - with session.no_autoflush: - dup = await check_duplicate_document_by_hash(session, content_hash) - if dup: - duplicate_count += 1 - skipped_count += 1 - continue - - parent_dir = str(Path(relative_path).parent) - if parent_dir == ".": - parent_dir = "" - folder_id = folder_mapping.get(parent_dir, folder_mapping.get("")) - - document = Document( - search_space_id=search_space_id, + doc = _build_connector_doc( title=file_info["name"], - document_type=DocumentType.LOCAL_FOLDER_FILE, - document_metadata={ - "folder_name": folder_name, - "file_path": relative_path, - "mtime": file_info["modified_at"].timestamp(), - }, - content="Pending...", - content_hash=unique_identifier_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=None, - status=DocumentStatus.pending(), - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=None, - folder_id=folder_id, - ) - session.add(document) - new_documents_created = True - - files_to_process.append( - { - "document": document, - "is_new": True, - "file_info": file_info, - "content": content, - "content_hash": content_hash, - "unique_identifier_hash": unique_identifier_hash, - "relative_path": relative_path, - "title": file_info["name"], - } + content=content, + relative_path=relative_path, + folder_name=folder_name, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=enable_summary, ) + connector_docs.append(doc) + file_meta_map[unique_identifier] = { + "relative_path": relative_path, + "mtime": file_info["modified_at"].timestamp(), + } except Exception as e: logger.exception(f"Phase 1 error for {file_info.get('path')}: {e}") failed_count += 1 - if new_documents_created: - await session.commit() - # ================================================================ # PHASE 1.5: Delete documents no longer on disk # ================================================================ - # Collect ALL folder IDs under this root (including folders that no - # longer exist on disk but still have rows in the DB) so we catch - # documents in deleted directories too. all_root_folder_ids = set(folder_mapping.values()) all_db_folders = ( await session.execute( @@ -727,98 +706,51 @@ async def index_local_folder( await session.flush() # ================================================================ - # PHASE 2: Process each document + # PHASE 2: Index via unified pipeline # ================================================================ - long_context_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) + if connector_docs: + from app.indexing_pipeline.document_hashing import ( + compute_unique_identifier_hash, + ) - for item in files_to_process: - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + pipeline = IndexingPipelineService(session) + doc_map = { + compute_unique_identifier_hash(cd): cd for cd in connector_docs + } + documents = await pipeline.prepare_for_indexing(connector_docs) + + llm = await get_user_long_context_llm(session, user_id, search_space_id) + + for document in documents: + connector_doc = doc_map.get(document.unique_identifier_hash) + if connector_doc is None: + failed_count += 1 + continue + + result = await pipeline.index(document, connector_doc, llm) + + if DocumentStatus.is_state(result.status, DocumentStatus.READY): + indexed_count += 1 + + # Assign folder_id and mtime post-pipeline + rel_path = (connector_doc.metadata or {}).get("file_path", "") + parent_dir = str(Path(rel_path).parent) if rel_path else "" + if parent_dir == ".": + parent_dir = "" + fid = folder_mapping.get(parent_dir, folder_mapping.get("")) + + unique_id = connector_doc.unique_id + mtime_info = file_meta_map.get(unique_id, {}) + + result.folder_id = fid + doc_meta = dict(result.document_metadata or {}) + doc_meta["mtime"] = mtime_info.get("mtime") + result.document_metadata = doc_meta + else: + failed_count += 1 + + if on_heartbeat_callback and indexed_count % 5 == 0: await on_heartbeat_callback(indexed_count) - last_heartbeat_time = current_time - - document = item["document"] - try: - document.status = DocumentStatus.processing() - await session.commit() - - title = item["title"] - relative_path = item["relative_path"] - content = item["content"] - content_hash = item["content_hash"] - file_info = item["file_info"] - - metadata_sections = [ - ( - "METADATA", - [ - f"Title: {title}", - f"Folder: {folder_name}", - f"Path: {relative_path}", - ], - ), - ("CONTENT", [content]), - ] - document_string = build_document_metadata_string(metadata_sections) - - summary_content = "" - if long_context_llm and enable_summary: - doc_meta = { - "folder_name": folder_name, - "file_path": relative_path, - } - summary_content, _ = await generate_document_summary( - document_string, long_context_llm, doc_meta - ) - - embedding = embed_text(document_string) - chunks = await create_document_chunks(document_string) - - parent_dir = str(Path(relative_path).parent) - if parent_dir == ".": - parent_dir = "" - folder_id = folder_mapping.get(parent_dir, folder_mapping.get("")) - - document.title = title - document.content = document_string - document.content_hash = content_hash - document.source_markdown = content - document.embedding = embedding - document.document_metadata = { - "folder_name": folder_name, - "file_path": relative_path, - "summary": summary_content, - "mtime": file_info["modified_at"].timestamp(), - } - document.folder_id = folder_id - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - - indexed_count += 1 - - if indexed_count % 10 == 0: - await session.commit() - - except Exception as e: - logger.exception(f"Phase 2 error for {item.get('relative_path')}: {e}") - try: - await session.rollback() - except Exception: - pass - try: - document.status = DocumentStatus.failed(str(e)[:500]) - document.updated_at = get_current_timestamp() - await session.commit() - except Exception: - try: - await session.rollback() - except Exception: - pass - failed_count += 1 # Cleanup empty folders existing_dirs = set() @@ -846,8 +778,6 @@ async def index_local_folder( raise warning_parts = [] - if duplicate_count > 0: - warning_parts.append(f"{duplicate_count} duplicate") if failed_count > 0: warning_parts.append(f"{failed_count} failed") warning_message = ", ".join(warning_parts) if warning_parts else None @@ -859,7 +789,6 @@ async def index_local_folder( "indexed": indexed_count, "skipped": skipped_count, "failed": failed_count, - "duplicates": duplicate_count, }, ) @@ -899,8 +828,8 @@ async def _index_single_file( if not full_path.exists(): rel = str(full_path.relative_to(folder_path)) unique_id = f"{folder_name}:{rel}" - uid_hash = generate_unique_identifier_hash( - DocumentType.LOCAL_FOLDER_FILE, unique_id, search_space_id + uid_hash = compute_identifier_hash( + DocumentType.LOCAL_FOLDER_FILE.value, unique_id, search_space_id ) existing = await check_document_by_unique_identifier(session, uid_hash) if existing: @@ -918,8 +847,8 @@ async def _index_single_file( rel_path = str(full_path.relative_to(folder_path)) unique_id = f"{folder_name}:{rel_path}" - uid_hash = generate_unique_identifier_hash( - DocumentType.LOCAL_FOLDER_FILE, unique_id, search_space_id + uid_hash = compute_identifier_hash( + DocumentType.LOCAL_FOLDER_FILE.value, unique_id, search_space_id ) try: @@ -945,83 +874,51 @@ async def _index_single_file( await create_version_snapshot(session, existing) - long_context_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - title = full_path.name mtime = full_path.stat().st_mtime - metadata_sections = [ - ("METADATA", [f"Title: {title}", f"Folder: {folder_name}", f"Path: {rel_path}"]), - ("CONTENT", [content]), - ] - document_string = build_document_metadata_string(metadata_sections) + connector_doc = _build_connector_doc( + title=full_path.name, + content=content, + relative_path=rel_path, + folder_name=folder_name, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=enable_summary, + ) - summary_content = "" - if long_context_llm and enable_summary: - summary_content, _ = await generate_document_summary( - document_string, long_context_llm, {"folder_name": folder_name, "file_path": rel_path} - ) + pipeline = IndexingPipelineService(session) + llm = await get_user_long_context_llm(session, user_id, search_space_id) + documents = await pipeline.prepare_for_indexing([connector_doc]) - embedding = embed_text(document_string) - chunks = await create_document_chunks(document_string) + if not documents: + return 0, 1, None - doc_metadata = { - "folder_name": folder_name, - "file_path": rel_path, - "summary": summary_content, - "mtime": mtime, - } + db_doc = documents[0] + await pipeline.index(db_doc, connector_doc, llm) + # Post-pipeline: assign folder_id and mtime + await session.refresh(db_doc) folder_id = None if root_folder_id: folder_id = await _resolve_folder_for_file( session, rel_path, root_folder_id, search_space_id, user_id ) - - if existing: - existing.title = title - existing.content = document_string - existing.content_hash = content_hash - existing.source_markdown = content - existing.embedding = embedding - existing.document_metadata = doc_metadata - existing.folder_id = folder_id - await safe_set_chunks(session, existing, chunks) - existing.updated_at = get_current_timestamp() - existing.status = DocumentStatus.ready() - else: - document = Document( - search_space_id=search_space_id, - title=title, - document_type=DocumentType.LOCAL_FOLDER_FILE, - document_metadata=doc_metadata, - content=document_string, - content_hash=content_hash, - unique_identifier_hash=uid_hash, - source_markdown=content, - embedding=embedding, - status=DocumentStatus.ready(), - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=None, - folder_id=folder_id, - ) - session.add(document) - await session.flush() - for chunk in chunks: - chunk.document_id = document.id - session.add_all(chunks) - + db_doc.folder_id = folder_id + doc_meta = dict(db_doc.document_metadata or {}) + doc_meta["mtime"] = mtime + db_doc.document_metadata = doc_meta await session.commit() - await task_logger.log_task_success( - log_entry, - f"Single file indexed: {rel_path}", - {"file": rel_path}, - ) - return 1, 0, None + indexed = 1 if DocumentStatus.is_state(db_doc.status, DocumentStatus.READY) else 0 + failed_msg = None if indexed else "Indexing failed" + + if indexed: + await task_logger.log_task_success( + log_entry, + f"Single file indexed: {rel_path}", + {"file": rel_path}, + ) + return indexed, 0 if indexed else 1, failed_msg except Exception as e: logger.exception(f"Error indexing single file {target_file_path}: {e}") diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py b/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py index 110aa6caf..154cc6e0e 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py @@ -1,8 +1,7 @@ -"""Integration tests for local folder indexer — Tier 3 (I1-I5), Tier 4 (F1-F5), Tier 5 (P1).""" +"""Integration tests for local folder indexer — Tier 3 (I1-I5), Tier 4 (F1-F7), Tier 5 (P1).""" import os from pathlib import Path -from unittest.mock import AsyncMock, MagicMock import pytest from sqlalchemy import func, select @@ -18,41 +17,11 @@ from app.db import ( User, ) -import app.tasks.connector_indexers.local_folder_indexer as _lfi_mod - pytestmark = pytest.mark.integration - -@pytest.fixture -def patched_self_hosted(monkeypatch): - _cfg = type("_Cfg", (), {"is_self_hosted": staticmethod(lambda: True)})() - monkeypatch.setattr(_lfi_mod, "config", _cfg) - - -@pytest.fixture -def patched_embed_for_indexer(monkeypatch): - from app.config import config as app_config - dim = app_config.embedding_model_instance.dimension - mock = MagicMock(return_value=[0.1] * dim) - monkeypatch.setattr(_lfi_mod, "embed_text", mock) - return mock - - -@pytest.fixture -def patched_chunks_for_indexer(monkeypatch): - from app.db import Chunk - from app.config import config as app_config - dim = app_config.embedding_model_instance.dimension - - async def mock_create_chunks(text): - return [Chunk(content="chunk", embedding=[0.1] * dim)] - - monkeypatch.setattr(_lfi_mod, "create_document_chunks", mock_create_chunks) - - -@pytest.fixture -def patched_summary_for_indexer(monkeypatch): - monkeypatch.setattr(_lfi_mod, "get_user_long_context_llm", AsyncMock(return_value=None)) +UNIFIED_FIXTURES = ( + "patched_summarize", "patched_embed_texts", "patched_chunk_text", +) # ==================================================================== @@ -62,12 +31,7 @@ def patched_summary_for_indexer(monkeypatch): class TestFullIndexer: - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_i1_new_file_indexed( self, db_session: AsyncSession, @@ -103,12 +67,7 @@ class TestFullIndexer: assert docs[0].document_type == DocumentType.LOCAL_FOLDER_FILE assert DocumentStatus.is_state(docs[0].status, DocumentStatus.READY) - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_i2_unchanged_skipped( self, db_session: AsyncSession, @@ -130,7 +89,6 @@ class TestFullIndexer: ) assert count1 == 1 - # Second run — unchanged, pass root_folder_id from first run count2, _, _, _ = await index_local_folder( session=db_session, search_space_id=db_search_space.id, @@ -151,12 +109,7 @@ class TestFullIndexer: ).scalar_one() assert total == 1 - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_i3_changed_reindexed( self, db_session: AsyncSession, @@ -178,9 +131,7 @@ class TestFullIndexer: folder_name="test-folder", ) - # Modify f.write_text("# Version 2\n\nUpdated.") - # Touch mtime to ensure it's detected as different os.utime(f, (f.stat().st_atime + 10, f.stat().st_mtime + 10)) count, _, _, _ = await index_local_folder( @@ -193,7 +144,6 @@ class TestFullIndexer: ) assert count == 1 - # Should have a version snapshot versions = ( await db_session.execute( select(DocumentVersion).join(Document).where( @@ -204,12 +154,7 @@ class TestFullIndexer: ).scalars().all() assert len(versions) >= 1 - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_i4_deleted_removed( self, db_session: AsyncSession, @@ -262,12 +207,7 @@ class TestFullIndexer: ).scalar_one() assert docs_after == 0 - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_i5_single_file_mode( self, db_session: AsyncSession, @@ -305,18 +245,13 @@ class TestFullIndexer: # ==================================================================== -# Tier 4: Folder Mirroring (F1-F5) +# Tier 4: Folder Mirroring (F1-F7) # ==================================================================== class TestFolderMirroring: - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_f1_root_folder_created( self, db_session: AsyncSession, @@ -344,12 +279,7 @@ class TestFolderMirroring: ).scalar_one() assert root_folder.name == "test-folder" - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_f2_nested_folder_rows( self, db_session: AsyncSession, @@ -393,12 +323,7 @@ class TestFolderMirroring: assert daily_folder.parent_id == notes_folder.id assert weekly_folder.parent_id == notes_folder.id - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_f3_resync_reuses_folders( self, db_session: AsyncSession, @@ -428,7 +353,6 @@ class TestFolderMirroring: ).scalars().all() ids_before = {f.id for f in folders_before} - # Re-sync with root_folder_id from first run await index_local_folder( session=db_session, search_space_id=db_search_space.id, @@ -447,12 +371,7 @@ class TestFolderMirroring: assert ids_before == ids_after - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_f4_folder_id_assigned( self, db_session: AsyncSession, @@ -496,15 +415,9 @@ class TestFolderMirroring: assert today_doc.folder_id == daily_folder.id - # Root doc should be in the root folder assert root_doc.folder_id == root_folder_id - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_f5_empty_folder_cleanup( self, db_session: AsyncSession, @@ -531,7 +444,6 @@ class TestFolderMirroring: folder_name="test-folder", ) - # Verify weekly folder exists weekly_folder = ( await db_session.execute( select(Folder).where(Folder.name == "weekly") @@ -539,7 +451,6 @@ class TestFolderMirroring: ).scalar_one_or_none() assert weekly_folder is not None - # Delete weekly directory + its file shutil.rmtree(weekly) await index_local_folder( @@ -551,7 +462,6 @@ class TestFolderMirroring: root_folder_id=root_folder_id, ) - # weekly Folder should be gone (empty, dir removed) weekly_after = ( await db_session.execute( select(Folder).where(Folder.name == "weekly") @@ -559,7 +469,6 @@ class TestFolderMirroring: ).scalar_one_or_none() assert weekly_after is None - # daily should still exist daily_after = ( await db_session.execute( select(Folder).where(Folder.name == "daily") @@ -567,12 +476,7 @@ class TestFolderMirroring: ).scalar_one_or_none() assert daily_after is not None - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_f6_single_file_creates_subfolder( self, db_session: AsyncSession, @@ -634,12 +538,7 @@ class TestFolderMirroring: assert daily_folder.parent_id == notes_folder.id assert notes_folder.parent_id == root_folder_id - @pytest.mark.usefixtures( - "patched_self_hosted", - "patched_embed_for_indexer", - "patched_chunks_for_indexer", - "patched_summary_for_indexer", - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_f7_single_file_delete_cleans_empty_folders( self, db_session: AsyncSession, @@ -705,9 +604,7 @@ class TestFolderMirroring: class TestPipelineIntegration: - @pytest.mark.usefixtures( - "patched_summarize", "patched_embed_texts", "patched_chunk_text" - ) + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) async def test_p1_local_folder_file_through_pipeline( self, db_session: AsyncSession,