diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 25045e84a..1a4d3ea06 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -110,7 +110,6 @@ class SearchSourceConnectorType(StrEnum): COMPOSIO_GOOGLE_DRIVE_CONNECTOR = "COMPOSIO_GOOGLE_DRIVE_CONNECTOR" COMPOSIO_GMAIL_CONNECTOR = "COMPOSIO_GMAIL_CONNECTOR" COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR" - LOCAL_FOLDER_CONNECTOR = "LOCAL_FOLDER_CONNECTOR" class PodcastStatus(StrEnum): diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 2d999eae3..d7974f9ff 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -2,6 +2,7 @@ import asyncio from fastapi import APIRouter, Depends, Form, HTTPException, UploadFile +from pydantic import BaseModel as PydanticBaseModel from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy.orm import selectinload @@ -11,6 +12,7 @@ from app.db import ( Document, DocumentType, DocumentVersion, + Folder, Permission, SearchSpace, SearchSpaceMembership, @@ -1258,3 +1260,144 @@ async def restore_document_version( "document_id": document_id, "restored_version": version_number, } + + +# ===== Local folder indexing endpoints ===== + + +class FolderIndexRequest(PydanticBaseModel): + folder_path: str + folder_name: str + search_space_id: int + exclude_patterns: list[str] | None = None + file_extensions: list[str] | None = None + root_folder_id: int | None = None + enable_summary: bool = False + + +class FolderIndexFileRequest(PydanticBaseModel): + folder_path: str + folder_name: str + search_space_id: int + target_file_path: str + enable_summary: bool = False + + +@router.post("/documents/folder-index") +async def folder_index( + request: FolderIndexRequest, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """Full-scan index of a local folder. Creates the root Folder row synchronously + and dispatches the heavy indexing work to a Celery task. + Returns the root_folder_id so the desktop can persist it. + """ + from app.config import config as app_config + + if not app_config.is_self_hosted(): + raise HTTPException( + status_code=400, + detail="Local folder indexing is only available in self-hosted mode", + ) + + await check_permission( + session, + user, + request.search_space_id, + Permission.DOCUMENTS_CREATE.value, + "You don't have permission to create documents in this search space", + ) + + root_folder_id = request.root_folder_id + if root_folder_id: + existing = ( + await session.execute( + select(Folder).where(Folder.id == root_folder_id) + ) + ).scalar_one_or_none() + if not existing: + root_folder_id = None + + if not root_folder_id: + root_folder = Folder( + name=request.folder_name, + search_space_id=request.search_space_id, + created_by_id=str(user.id), + position="a0", + ) + session.add(root_folder) + await session.flush() + root_folder_id = root_folder.id + await session.commit() + + from app.tasks.celery_tasks.document_tasks import index_local_folder_task + + index_local_folder_task.delay( + search_space_id=request.search_space_id, + user_id=str(user.id), + folder_path=request.folder_path, + folder_name=request.folder_name, + exclude_patterns=request.exclude_patterns, + file_extensions=request.file_extensions, + root_folder_id=root_folder_id, + enable_summary=request.enable_summary, + ) + + return { + "message": "Folder indexing started", + "status": "processing", + "root_folder_id": root_folder_id, + } + + +@router.post("/documents/folder-index-file") +async def folder_index_file( + request: FolderIndexFileRequest, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """Index a single file within a watched folder (chokidar trigger). + Validates that target_file_path is under folder_path. + """ + from app.config import config as app_config + + if not app_config.is_self_hosted(): + raise HTTPException( + status_code=400, + detail="Local folder indexing is only available in self-hosted mode", + ) + + await check_permission( + session, + user, + request.search_space_id, + Permission.DOCUMENTS_CREATE.value, + "You don't have permission to create documents in this search space", + ) + + from pathlib import Path + + try: + Path(request.target_file_path).relative_to(request.folder_path) + except ValueError: + raise HTTPException( + status_code=400, + detail="target_file_path must be inside folder_path", + ) + + from app.tasks.celery_tasks.document_tasks import index_local_folder_task + + index_local_folder_task.delay( + search_space_id=request.search_space_id, + user_id=str(user.id), + folder_path=request.folder_path, + folder_name=request.folder_name, + target_file_path=request.target_file_path, + enable_summary=request.enable_summary, + ) + + return { + "message": "File indexing started", + "status": "processing", + } diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 5ea88c418..f49ba2d5d 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1170,24 +1170,6 @@ async def index_connector_content( ) response_message = "Obsidian vault indexing started in the background." - elif connector.connector_type == SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR: - from app.config import config as app_config - from app.tasks.celery_tasks.connector_tasks import index_local_folder_task - - if not app_config.is_self_hosted(): - raise HTTPException( - status_code=400, - detail="Local folder connector is only available in self-hosted mode", - ) - - logger.info( - f"Triggering local folder indexing for connector {connector_id} into search space {search_space_id}" - ) - index_local_folder_task.delay( - connector_id, search_space_id, str(user.id), indexing_from, indexing_to - ) - response_message = "Local folder indexing started in the background." - elif ( connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR @@ -1320,76 +1302,6 @@ async def index_connector_content( ) from e -class IndexFileRequest(BaseModel): - file_path: str = Field(..., description="Absolute path to the file to index") - - -@router.post( - "/search-source-connectors/{connector_id}/index-file", - response_model=dict[str, Any], -) -async def index_single_file( - connector_id: int, - body: IndexFileRequest, - session: AsyncSession = Depends(get_async_session), - user: User = Depends(current_active_user), -): - """Index a single file from a local folder connector (chokidar real-time trigger).""" - from app.config import config as app_config - from app.tasks.celery_tasks.connector_tasks import index_local_folder_task - - if not app_config.is_self_hosted(): - raise HTTPException( - status_code=400, - detail="Local folder connector is only available in self-hosted mode", - ) - - result = await session.execute( - select(SearchSourceConnector).filter( - SearchSourceConnector.id == connector_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR, - ) - ) - connector = result.scalars().first() - if not connector: - raise HTTPException(status_code=404, detail="Local folder connector not found") - - await check_permission(session, user, connector.search_space_id, Permission.CONNECTORS_UPDATE.value) - - folder_path = connector.config.get("folder_path", "") - - # Security: resolve symlinks and verify the file is inside folder_path - try: - resolved_file = os.path.realpath(body.file_path) - resolved_folder = os.path.realpath(folder_path) - if not resolved_file.startswith(resolved_folder + os.sep) and resolved_file != resolved_folder: - raise HTTPException( - status_code=403, - detail="File path is outside the configured folder", - ) - except (OSError, ValueError): - raise HTTPException( - status_code=403, - detail="Invalid file path", - ) - - index_local_folder_task.delay( - connector_id, - connector.search_space_id, - str(user.id), - None, - None, - target_file_path=resolved_file, - ) - - return { - "message": "Single file indexing started", - "connector_id": connector_id, - "file_path": body.file_path, - } - - async def _update_connector_timestamp_by_id(session: AsyncSession, connector_id: int): """ Update the last_indexed_at timestamp for a connector by its ID. @@ -3166,62 +3078,6 @@ async def run_obsidian_indexing( ) -async def run_local_folder_indexing_with_new_session( - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str, - end_date: str, - target_file_path: str | None = None, -): - """Wrapper to run local folder indexing with its own database session.""" - logger.info( - f"Background task started: Indexing local folder connector {connector_id} into space {search_space_id}" - ) - async with async_session_maker() as session: - await run_local_folder_indexing( - session, connector_id, search_space_id, user_id, start_date, end_date, - target_file_path=target_file_path, - ) - logger.info(f"Background task finished: Indexing local folder connector {connector_id}") - - -async def run_local_folder_indexing( - session: AsyncSession, - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str, - end_date: str, - target_file_path: str | None = None, -): - """Background task to run local folder indexing.""" - from app.tasks.connector_indexers import index_local_folder - - await _run_indexing_with_notifications( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - indexing_function=lambda session, connector_id, search_space_id, user_id, - start_date, end_date, update_last_indexed, on_heartbeat_callback: index_local_folder( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - update_last_indexed=update_last_indexed, - on_heartbeat_callback=on_heartbeat_callback, - target_file_path=target_file_path, - ), - update_timestamp_func=_update_connector_timestamp_by_id, - supports_heartbeat_callback=True, - ) - - async def run_composio_indexing_with_new_session( connector_id: int, search_space_id: int, diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 9ff578ad2..57475c9fd 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -926,52 +926,6 @@ async def _index_obsidian_vault( ) -@celery_app.task(name="index_local_folder", bind=True) -def index_local_folder_task( - self, - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str = None, - end_date: str = None, - target_file_path: str = None, -): - """Celery task to index a local folder.""" - import asyncio - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - loop.run_until_complete( - _index_local_folder( - connector_id, search_space_id, user_id, start_date, end_date, target_file_path - ) - ) - finally: - loop.close() - - -async def _index_local_folder( - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str = None, - end_date: str = None, - target_file_path: str = None, -): - """Index local folder with new session.""" - from app.routes.search_source_connectors_routes import ( - run_local_folder_indexing, - ) - - async with get_celery_session_maker()() as session: - await run_local_folder_indexing( - session, connector_id, search_space_id, user_id, start_date, end_date, - target_file_path=target_file_path, - ) - - @celery_app.task(name="index_composio_connector", bind=True) def index_composio_connector_task( self, diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 662b41f2a..110f3deee 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -10,6 +10,7 @@ from app.config import config from app.services.notification_service import NotificationService from app.services.task_logging_service import TaskLoggingService from app.tasks.celery_tasks import get_celery_session_maker +from app.tasks.connector_indexers.local_folder_indexer import index_local_folder from app.tasks.document_processors import ( add_extension_received_document, add_youtube_video_document, @@ -1243,3 +1244,68 @@ async def _process_circleback_meeting( heartbeat_task.cancel() if notification: _stop_heartbeat(notification.id) + + +# ===== Local folder indexing task ===== + + +@celery_app.task(name="index_local_folder", bind=True) +def index_local_folder_task( + self, + search_space_id: int, + user_id: str, + folder_path: str, + folder_name: str, + exclude_patterns: list[str] | None = None, + file_extensions: list[str] | None = None, + root_folder_id: int | None = None, + enable_summary: bool = False, + target_file_path: str | None = None, +): + """Celery task to index a local folder. Config is passed directly — no connector row.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _index_local_folder_async( + search_space_id=search_space_id, + user_id=user_id, + folder_path=folder_path, + folder_name=folder_name, + exclude_patterns=exclude_patterns, + file_extensions=file_extensions, + root_folder_id=root_folder_id, + enable_summary=enable_summary, + target_file_path=target_file_path, + ) + ) + finally: + loop.close() + + +async def _index_local_folder_async( + search_space_id: int, + user_id: str, + folder_path: str, + folder_name: str, + exclude_patterns: list[str] | None = None, + file_extensions: list[str] | None = None, + root_folder_id: int | None = None, + enable_summary: bool = False, + target_file_path: str | None = None, +): + """Run local folder indexing with a fresh DB session.""" + async with get_celery_session_maker()() as session: + await index_local_folder( + session=session, + search_space_id=search_space_id, + user_id=user_id, + folder_path=folder_path, + folder_name=folder_name, + exclude_patterns=exclude_patterns, + file_extensions=file_extensions, + root_folder_id=root_folder_id, + enable_summary=enable_summary, + target_file_path=target_file_path, + ) diff --git a/surfsense_backend/app/tasks/connector_indexers/__init__.py b/surfsense_backend/app/tasks/connector_indexers/__init__.py index 8e4ad69e5..1b032d54a 100644 --- a/surfsense_backend/app/tasks/connector_indexers/__init__.py +++ b/surfsense_backend/app/tasks/connector_indexers/__init__.py @@ -44,7 +44,6 @@ from .jira_indexer import index_jira_issues from .linear_indexer import index_linear_issues # Documentation and knowledge management -from .local_folder_indexer import index_local_folder from .luma_indexer import index_luma_events from .notion_indexer import index_notion_pages from .obsidian_indexer import index_obsidian_vault @@ -75,5 +74,4 @@ __all__ = [ # noqa: RUF022 # Communication platforms "index_slack_messages", "index_google_gmail_messages", - "index_local_folder", ] diff --git a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py index fc7fdaf66..591914625 100644 --- a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py @@ -1,5 +1,5 @@ """ -Local folder connector indexer. +Local folder indexer. Indexes files from a local folder on disk. Supports: - Full-scan mode (startup reconciliation / manual trigger) @@ -8,7 +8,9 @@ Indexes files from a local folder on disk. Supports: - Document versioning via create_version_snapshot - ETL-based file parsing for binary formats (PDF, DOCX, images, audio, etc.) -Electron-only: all change detection is driven by chokidar in the desktop app. +Desktop-only: all change detection is driven by chokidar in the desktop app. +Config (folder_path, exclude_patterns, etc.) is passed in from the caller — +no connector row is read. """ import os @@ -17,10 +19,9 @@ from collections.abc import Awaitable, Callable from datetime import UTC, datetime from pathlib import Path -from sqlalchemy import delete, select +from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm.attributes import flag_modified from app.config import config from app.db import ( @@ -28,7 +29,6 @@ from app.db import ( DocumentStatus, DocumentType, Folder, - SearchSourceConnectorType, ) from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService @@ -45,11 +45,9 @@ from .base import ( build_document_metadata_string, check_document_by_unique_identifier, check_duplicate_document_by_hash, - get_connector_by_id, get_current_timestamp, logger, safe_set_chunks, - update_connector_last_indexed, ) PLAINTEXT_EXTENSIONS = frozenset({ @@ -131,12 +129,10 @@ def scan_folder( for dirpath, dirnames, filenames in os.walk(root): rel_dir = Path(dirpath).relative_to(root) - # Prune excluded directories in-place so os.walk skips them dirnames[:] = [ d for d in dirnames if d not in exclude_patterns ] - # Check if the current directory itself is excluded if any(part in exclude_patterns for part in rel_dir.parts): continue @@ -232,20 +228,18 @@ async def _mirror_folder_structure( folder_name: str, search_space_id: int, user_id: str, - connector_config: dict, - connector, + root_folder_id: int | None = None, exclude_patterns: list[str] | None = None, -) -> dict[str, int]: +) -> tuple[dict[str, int], int]: """Mirror the local filesystem directory structure into DB Folder rows. - Returns a mapping of relative_dir_path -> folder_id. - The empty string key ("") maps to the root folder. + Returns (mapping, root_folder_id) where mapping is + relative_dir_path -> folder_id. The empty string key maps to the root folder. """ root = Path(folder_path) if exclude_patterns is None: exclude_patterns = [] - # Collect all subdirectory paths relative to root subdirs: list[str] = [] for dirpath, dirnames, _ in os.walk(root): dirnames[:] = [d for d in dirnames if d not in exclude_patterns] @@ -256,13 +250,10 @@ async def _mirror_folder_structure( if rel_str: subdirs.append(rel_str) - # Sort by depth so parents are created before children subdirs.sort(key=lambda p: p.count(os.sep)) mapping: dict[str, int] = {} - # Get or create root folder - root_folder_id = connector_config.get("root_folder_id") if root_folder_id: existing = ( await session.execute( @@ -284,12 +275,8 @@ async def _mirror_folder_structure( session.add(root_folder) await session.flush() mapping[""] = root_folder.id - # Persist root_folder_id into connector config - connector_config["root_folder_id"] = root_folder.id - connector.config = {**connector.config, "root_folder_id": root_folder.id} - flag_modified(connector, "config") + root_folder_id = root_folder.id - # Create/reuse subdirectory Folder rows for rel_dir in subdirs: dir_parts = Path(rel_dir).parts dir_name = dir_parts[-1] @@ -322,7 +309,7 @@ async def _mirror_folder_structure( mapping[rel_dir] = new_folder.id await session.flush() - return mapping + return mapping, root_folder_id async def _cleanup_empty_folders( @@ -332,16 +319,11 @@ async def _cleanup_empty_folders( existing_dirs_on_disk: set[str], folder_mapping: dict[str, int], ) -> None: - """Delete Folder rows that are empty (no docs, no children) and no longer on disk. + """Delete Folder rows that are empty (no docs, no children) and no longer on disk.""" + from sqlalchemy import delete as sa_delete - Queries ALL folders under this search space (not just the current mapping) - so that stale folders from previous syncs are also cleaned up. - """ - # Build a reverse mapping from folder_id → rel_dir for known dirs id_to_rel: dict[int, str] = {fid: rel for rel, fid in folder_mapping.items() if rel} - # Also find any folders in the DB that are children of the root but NOT - # in the current mapping (stale from a previous sync). all_folders = ( await session.execute( select(Folder).where( @@ -351,7 +333,6 @@ async def _cleanup_empty_folders( ) ).scalars().all() - # Build candidates: folders not on disk that we might delete candidates: list[Folder] = [] for folder in all_folders: rel = id_to_rel.get(folder.id) @@ -359,8 +340,6 @@ async def _cleanup_empty_folders( continue candidates.append(folder) - # Sort deepest first (by name depth heuristic — folders with no children first) - # Repeat until no more deletions happen (cascading empty parents) changed = True while changed: changed = False @@ -384,57 +363,46 @@ async def _cleanup_empty_folders( remaining.append(folder) continue - await session.execute(delete(Folder).where(Folder.id == folder.id)) + await session.execute(sa_delete(Folder).where(Folder.id == folder.id)) changed = True candidates = remaining async def index_local_folder( session: AsyncSession, - connector_id: int, search_space_id: int, user_id: str, - start_date: str | None = None, - end_date: str | None = None, - update_last_indexed: bool = True, - on_heartbeat_callback: HeartbeatCallbackType | None = None, + folder_path: str, + folder_name: str, + exclude_patterns: list[str] | None = None, + file_extensions: list[str] | None = None, + root_folder_id: int | None = None, + enable_summary: bool = False, target_file_path: str | None = None, -) -> tuple[int, int, str | None]: + on_heartbeat_callback: HeartbeatCallbackType | None = None, +) -> tuple[int, int, int | None, str | None]: """Index files from a local folder. Supports two modes: - Full scan (target_file_path=None): walks entire folder, handles new/changed/deleted files. - Single-file (target_file_path set): processes only that file. - Returns (indexed_count, skipped_count, error_or_warning_message). + Returns (indexed_count, skipped_count, root_folder_id, error_or_warning_message). """ task_logger = TaskLoggingService(session, search_space_id) log_entry = await task_logger.log_task_start( task_name="local_folder_indexing", - source="connector_indexing_task", - message=f"Starting local folder indexing for connector {connector_id}", + source="local_folder_indexing_task", + message=f"Starting local folder indexing for {folder_name}", metadata={ - "connector_id": connector_id, + "folder_path": folder_path, "user_id": str(user_id), "target_file_path": target_file_path, }, ) try: - connector = await get_connector_by_id( - session, connector_id, SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR - ) - if not connector: - await task_logger.log_task_failure( - log_entry, - f"Connector {connector_id} not found", - "Connector not found", - {}, - ) - return 0, 0, f"Connector {connector_id} not found" - - folder_path = connector.config.get("folder_path") if not folder_path or not os.path.exists(folder_path): await task_logger.log_task_failure( log_entry, @@ -442,59 +410,54 @@ async def index_local_folder( "Folder not found", {}, ) - return 0, 0, f"Folder path missing or does not exist: {folder_path}" + return 0, 0, root_folder_id, f"Folder path missing or does not exist: {folder_path}" - folder_name = connector.config.get("folder_name") or os.path.basename(folder_path) - exclude_patterns = connector.config.get("exclude_patterns", DEFAULT_EXCLUDE_PATTERNS) - file_extensions = connector.config.get("file_extensions") # None = all + if exclude_patterns is None: + exclude_patterns = DEFAULT_EXCLUDE_PATTERNS # ==================================================================== # SINGLE-FILE MODE # ==================================================================== if target_file_path: - return await _index_single_file( + indexed, skipped, err = await _index_single_file( session=session, - connector=connector, - connector_id=connector_id, search_space_id=search_space_id, user_id=user_id, folder_path=folder_path, folder_name=folder_name, target_file_path=target_file_path, + enable_summary=enable_summary, task_logger=task_logger, log_entry=log_entry, - update_last_indexed=update_last_indexed, ) + return indexed, skipped, root_folder_id, err # ==================================================================== # FULL-SCAN MODE # ==================================================================== - # Phase 0: Mirror folder structure await task_logger.log_task_progress( log_entry, "Mirroring folder structure", {"stage": "folder_mirror"} ) - folder_mapping = await _mirror_folder_structure( + folder_mapping, root_folder_id = await _mirror_folder_structure( session=session, folder_path=folder_path, folder_name=folder_name, search_space_id=search_space_id, user_id=user_id, - connector_config=connector.config, - connector=connector, + root_folder_id=root_folder_id, exclude_patterns=exclude_patterns, ) await session.flush() - # Scan files on disk try: files = scan_folder(folder_path, file_extensions, exclude_patterns) except Exception as e: await task_logger.log_task_failure( log_entry, f"Failed to scan folder: {e}", "Scan error", {} ) - return 0, 0, f"Failed to scan folder: {e}" + return 0, 0, root_folder_id, f"Failed to scan folder: {e}" logger.info(f"Found {len(files)} files in folder") @@ -530,7 +493,6 @@ async def index_local_folder( ) if existing_document: - # Check mtime first (cheap) stored_mtime = (existing_document.document_metadata or {}).get("mtime") current_mtime = file_info["modified_at"].timestamp() @@ -542,7 +504,6 @@ async def index_local_folder( skipped_count += 1 continue - # mtime differs — read file and check content hash try: content, content_hash = await _compute_file_content_hash( file_path_abs, file_info["relative_path"], search_space_id @@ -553,7 +514,6 @@ async def index_local_folder( continue if existing_document.content_hash == content_hash: - # Content same, just update mtime in metadata meta = dict(existing_document.document_metadata or {}) meta["mtime"] = current_mtime existing_document.document_metadata = meta @@ -564,7 +524,6 @@ async def index_local_folder( skipped_count += 1 continue - # Content actually changed — snapshot version, queue for re-index await create_version_snapshot(session, existing_document) files_to_process.append( @@ -581,7 +540,6 @@ async def index_local_folder( ) continue - # New document — read content try: content, content_hash = await _compute_file_content_hash( file_path_abs, file_info["relative_path"], search_space_id @@ -595,7 +553,6 @@ async def index_local_folder( skipped_count += 1 continue - # Check for duplicate content from another connector with session.no_autoflush: dup = await check_duplicate_document_by_hash(session, content_hash) if dup: @@ -603,7 +560,6 @@ async def index_local_folder( skipped_count += 1 continue - # Determine folder_id for this file parent_dir = str(Path(relative_path).parent) if parent_dir == ".": parent_dir = "" @@ -616,17 +572,16 @@ async def index_local_folder( document_metadata={ "folder_name": folder_name, "file_path": relative_path, - "connector_id": connector_id, "mtime": file_info["modified_at"].timestamp(), }, content="Pending...", - content_hash=unique_identifier_hash, # Temp unique — updated in phase 2 + content_hash=unique_identifier_hash, unique_identifier_hash=unique_identifier_hash, embedding=None, status=DocumentStatus.pending(), updated_at=get_current_timestamp(), created_by_id=user_id, - connector_id=connector_id, + connector_id=None, folder_id=folder_id, ) session.add(document) @@ -655,16 +610,17 @@ async def index_local_folder( # ================================================================ # PHASE 1.5: Delete documents no longer on disk # ================================================================ - all_connector_docs = ( + all_folder_docs = ( await session.execute( select(Document).where( - Document.connector_id == connector_id, Document.document_type == DocumentType.LOCAL_FOLDER_FILE, + Document.search_space_id == search_space_id, + Document.folder_id.in_(list(folder_mapping.values())), ) ) ).scalars().all() - for doc in all_connector_docs: + for doc in all_folder_docs: if doc.unique_identifier_hash not in seen_unique_hashes: await session.delete(doc) @@ -709,7 +665,7 @@ async def index_local_folder( document_string = build_document_metadata_string(metadata_sections) summary_content = "" - if long_context_llm and connector.enable_summary: + if long_context_llm and enable_summary: doc_meta = { "folder_name": folder_name, "file_path": relative_path, @@ -721,7 +677,6 @@ async def index_local_folder( embedding = embed_text(document_string) chunks = await create_document_chunks(document_string) - # Determine folder_id parent_dir = str(Path(relative_path).parent) if parent_dir == ".": parent_dir = "" @@ -735,7 +690,6 @@ async def index_local_folder( document.document_metadata = { "folder_name": folder_name, "file_path": relative_path, - "connector_id": connector_id, "summary": summary_content, "mtime": file_info["modified_at"].timestamp(), } @@ -782,8 +736,6 @@ async def index_local_folder( session, root_fid, search_space_id, existing_dirs, folder_mapping ) - await update_connector_last_indexed(session, connector, update_last_indexed) - try: await session.commit() except Exception as e: @@ -802,7 +754,7 @@ async def index_local_folder( await task_logger.log_task_success( log_entry, - f"Completed local folder indexing for connector {connector_id}", + f"Completed local folder indexing for {folder_name}", { "indexed": indexed_count, "skipped": skipped_count, @@ -811,7 +763,7 @@ async def index_local_folder( }, ) - return indexed_count, skipped_count, warning_message + return indexed_count, skipped_count, root_folder_id, warning_message except SQLAlchemyError as e: logger.exception(f"Database error during local folder indexing: {e}") @@ -819,34 +771,31 @@ async def index_local_folder( await task_logger.log_task_failure( log_entry, f"DB error: {e}", "Database error", {} ) - return 0, 0, f"Database error: {e}" + return 0, 0, root_folder_id, f"Database error: {e}" except Exception as e: logger.exception(f"Error during local folder indexing: {e}") await task_logger.log_task_failure( log_entry, f"Error: {e}", "Unexpected error", {} ) - return 0, 0, str(e) + return 0, 0, root_folder_id, str(e) async def _index_single_file( session: AsyncSession, - connector, - connector_id: int, search_space_id: int, user_id: str, folder_path: str, folder_name: str, target_file_path: str, + enable_summary: bool, task_logger, log_entry, - update_last_indexed: bool = True, ) -> tuple[int, int, str | None]: """Process a single file (chokidar real-time trigger).""" try: full_path = Path(target_file_path) if not full_path.exists(): - # File was deleted — find and remove the document rel = str(full_path.relative_to(folder_path)) unique_id = f"{folder_name}:{rel}" uid_hash = generate_unique_identifier_hash( @@ -880,7 +829,6 @@ async def _index_single_file( if existing: if existing.content_hash == content_hash: - # Update mtime mtime = full_path.stat().st_mtime meta = dict(existing.document_metadata or {}) meta["mtime"] = mtime @@ -888,10 +836,8 @@ async def _index_single_file( await session.commit() return 0, 1, None - # Content changed — snapshot + re-index await create_version_snapshot(session, existing) - # Get LLM long_context_llm = await get_user_long_context_llm( session, user_id, search_space_id ) @@ -906,7 +852,7 @@ async def _index_single_file( document_string = build_document_metadata_string(metadata_sections) summary_content = "" - if long_context_llm and connector.enable_summary: + if long_context_llm and enable_summary: summary_content, _ = await generate_document_summary( document_string, long_context_llm, {"folder_name": folder_name, "file_path": rel_path} ) @@ -917,7 +863,6 @@ async def _index_single_file( doc_metadata = { "folder_name": folder_name, "file_path": rel_path, - "connector_id": connector_id, "summary": summary_content, "mtime": mtime, } @@ -946,16 +891,14 @@ async def _index_single_file( status=DocumentStatus.ready(), updated_at=get_current_timestamp(), created_by_id=user_id, - connector_id=connector_id, + connector_id=None, ) session.add(document) - # Set chunks await session.flush() for chunk in chunks: chunk.document_id = document.id session.add_all(chunks) - await update_connector_last_indexed(session, connector, update_last_indexed) await session.commit() await task_logger.log_task_success( diff --git a/surfsense_backend/tests/integration/conftest.py b/surfsense_backend/tests/integration/conftest.py index 840246e2f..9c91011ae 100644 --- a/surfsense_backend/tests/integration/conftest.py +++ b/surfsense_backend/tests/integration/conftest.py @@ -168,22 +168,3 @@ def make_connector_document(db_connector, db_user): return _make -@pytest_asyncio.fixture -async def db_local_folder_connector( - db_session: AsyncSession, db_user: User, db_search_space: SearchSpace, tmp_path -) -> SearchSourceConnector: - connector = SearchSourceConnector( - name="Test Local Folder", - connector_type=SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR, - config={ - "folder_path": str(tmp_path), - "folder_name": "test-folder", - "exclude_patterns": [], - "file_extensions": None, - }, - search_space_id=db_search_space.id, - user_id=db_user.id, - ) - db_session.add(connector) - await db_session.flush() - return connector diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py b/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py index 988905f8f..e46d59a67 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py @@ -14,7 +14,6 @@ from app.db import ( DocumentType, DocumentVersion, Folder, - SearchSourceConnector, SearchSpace, User, ) @@ -72,7 +71,6 @@ class TestFullIndexer: async def test_i1_new_file_indexed( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, tmp_path: Path, @@ -82,11 +80,12 @@ class TestFullIndexer: (tmp_path / "note.md").write_text("# Hello World\n\nContent here.") - count, skipped, err = await index_local_folder( + count, skipped, root_folder_id, err = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", ) assert err is None @@ -95,7 +94,8 @@ class TestFullIndexer: docs = ( await db_session.execute( select(Document).where( - Document.connector_id == db_local_folder_connector.id + Document.document_type == DocumentType.LOCAL_FOLDER_FILE, + Document.search_space_id == db_search_space.id, ) ) ).scalars().all() @@ -112,7 +112,6 @@ class TestFullIndexer: async def test_i2_unchanged_skipped( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, tmp_path: Path, @@ -122,27 +121,31 @@ class TestFullIndexer: (tmp_path / "note.md").write_text("# Hello\n\nSame content.") - count1, _, _ = await index_local_folder( + count1, _, root_folder_id, _ = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", ) assert count1 == 1 - # Second run — unchanged - count2, _, _ = await index_local_folder( + # Second run — unchanged, pass root_folder_id from first run + count2, _, _, _ = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + root_folder_id=root_folder_id, ) assert count2 == 0 total = ( await db_session.execute( select(func.count()).select_from(Document).where( - Document.connector_id == db_local_folder_connector.id + Document.document_type == DocumentType.LOCAL_FOLDER_FILE, + Document.search_space_id == db_search_space.id, ) ) ).scalar_one() @@ -157,7 +160,6 @@ class TestFullIndexer: async def test_i3_changed_reindexed( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, tmp_path: Path, @@ -168,11 +170,12 @@ class TestFullIndexer: f = tmp_path / "note.md" f.write_text("# Version 1\n\nOriginal.") - await index_local_folder( + _, _, root_folder_id, _ = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", ) # Modify @@ -180,11 +183,13 @@ class TestFullIndexer: # Touch mtime to ensure it's detected as different os.utime(f, (f.stat().st_atime + 10, f.stat().st_mtime + 10)) - count, _, _ = await index_local_folder( + count, _, _, _ = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + root_folder_id=root_folder_id, ) assert count == 1 @@ -192,7 +197,8 @@ class TestFullIndexer: versions = ( await db_session.execute( select(DocumentVersion).join(Document).where( - Document.connector_id == db_local_folder_connector.id + Document.document_type == DocumentType.LOCAL_FOLDER_FILE, + Document.search_space_id == db_search_space.id, ) ) ).scalars().all() @@ -207,7 +213,6 @@ class TestFullIndexer: async def test_i4_deleted_removed( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, tmp_path: Path, @@ -218,17 +223,19 @@ class TestFullIndexer: f = tmp_path / "to_delete.md" f.write_text("# Delete me") - await index_local_folder( + _, _, root_folder_id, _ = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", ) docs_before = ( await db_session.execute( select(func.count()).select_from(Document).where( - Document.connector_id == db_local_folder_connector.id + Document.document_type == DocumentType.LOCAL_FOLDER_FILE, + Document.search_space_id == db_search_space.id, ) ) ).scalar_one() @@ -238,15 +245,18 @@ class TestFullIndexer: await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + root_folder_id=root_folder_id, ) docs_after = ( await db_session.execute( select(func.count()).select_from(Document).where( - Document.connector_id == db_local_folder_connector.id + Document.document_type == DocumentType.LOCAL_FOLDER_FILE, + Document.search_space_id == db_search_space.id, ) ) ).scalar_one() @@ -261,7 +271,6 @@ class TestFullIndexer: async def test_i5_single_file_mode( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, tmp_path: Path, @@ -273,11 +282,12 @@ class TestFullIndexer: (tmp_path / "b.md").write_text("File B") (tmp_path / "c.md").write_text("File C") - count, _, _ = await index_local_folder( + count, _, _, _ = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", target_file_path=str(tmp_path / "b.md"), ) assert count == 1 @@ -285,12 +295,13 @@ class TestFullIndexer: docs = ( await db_session.execute( select(Document).where( - Document.connector_id == db_local_folder_connector.id + Document.document_type == DocumentType.LOCAL_FOLDER_FILE, + Document.search_space_id == db_search_space.id, ) ) ).scalars().all() assert len(docs) == 1 - assert docs[0].title == "b" + assert docs[0].title == "b.md" # ==================================================================== @@ -309,30 +320,27 @@ class TestFolderMirroring: async def test_f1_root_folder_created( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, tmp_path: Path, ): - """F1: First sync creates a root Folder and stores root_folder_id.""" + """F1: First sync creates a root Folder and returns root_folder_id.""" from app.tasks.connector_indexers.local_folder_indexer import index_local_folder (tmp_path / "root.md").write_text("Root file") - await index_local_folder( + _, _, root_folder_id, _ = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", ) - # Refresh connector - await db_session.refresh(db_local_folder_connector) - root_id = db_local_folder_connector.config.get("root_folder_id") - assert root_id is not None + assert root_folder_id is not None root_folder = ( - await db_session.execute(select(Folder).where(Folder.id == root_id)) + await db_session.execute(select(Folder).where(Folder.id == root_folder_id)) ).scalar_one() assert root_folder.name == "test-folder" @@ -345,7 +353,6 @@ class TestFolderMirroring: async def test_f2_nested_folder_rows( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, tmp_path: Path, @@ -362,9 +369,10 @@ class TestFolderMirroring: await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", ) folders = ( @@ -394,7 +402,6 @@ class TestFolderMirroring: async def test_f3_resync_reuses_folders( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, tmp_path: Path, @@ -406,11 +413,12 @@ class TestFolderMirroring: sub.mkdir() (sub / "file.md").write_text("content") - await index_local_folder( + _, _, root_folder_id, _ = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", ) folders_before = ( @@ -420,12 +428,14 @@ class TestFolderMirroring: ).scalars().all() ids_before = {f.id for f in folders_before} - # Re-sync + # Re-sync with root_folder_id from first run await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + root_folder_id=root_folder_id, ) folders_after = ( @@ -446,7 +456,6 @@ class TestFolderMirroring: async def test_f4_folder_id_assigned( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, tmp_path: Path, @@ -459,17 +468,19 @@ class TestFolderMirroring: (daily / "today.md").write_text("today note") (tmp_path / "root.md").write_text("root note") - await index_local_folder( + _, _, root_folder_id, _ = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", ) docs = ( await db_session.execute( select(Document).where( - Document.connector_id == db_local_folder_connector.id + Document.document_type == DocumentType.LOCAL_FOLDER_FILE, + Document.search_space_id == db_search_space.id, ) ) ).scalars().all() @@ -486,9 +497,7 @@ class TestFolderMirroring: assert today_doc.folder_id == daily_folder.id # Root doc should be in the root folder - await db_session.refresh(db_local_folder_connector) - root_fid = db_local_folder_connector.config.get("root_folder_id") - assert root_doc.folder_id == root_fid + assert root_doc.folder_id == root_folder_id @pytest.mark.usefixtures( "patched_self_hosted", @@ -499,7 +508,6 @@ class TestFolderMirroring: async def test_f5_empty_folder_cleanup( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, tmp_path: Path, @@ -515,11 +523,12 @@ class TestFolderMirroring: (daily / "today.md").write_text("today") (weekly / "review.md").write_text("review") - await index_local_folder( + _, _, root_folder_id, _ = await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", ) # Verify weekly folder exists @@ -535,9 +544,11 @@ class TestFolderMirroring: await index_local_folder( session=db_session, - connector_id=db_local_folder_connector.id, search_space_id=db_search_space.id, user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + root_folder_id=root_folder_id, ) # weekly Folder should be gone (empty, dir removed) @@ -570,7 +581,6 @@ class TestPipelineIntegration: async def test_p1_local_folder_file_through_pipeline( self, db_session: AsyncSession, - db_local_folder_connector: SearchSourceConnector, db_user: User, db_search_space: SearchSpace, mocker, @@ -585,7 +595,7 @@ class TestPipelineIntegration: unique_id="test-folder:test.md", document_type=DocumentType.LOCAL_FOLDER_FILE, search_space_id=db_search_space.id, - connector_id=db_local_folder_connector.id, + connector_id=None, created_by_id=str(db_user.id), )