refactor: remove Local Folder connector and related tasks, implement new folder indexing endpoints

This commit is contained in:
Anish Sarkar 2026-04-02 22:21:31 +05:30
parent 149ccb97dd
commit 22ee5c99cc
9 changed files with 326 additions and 376 deletions

View file

@ -110,7 +110,6 @@ class SearchSourceConnectorType(StrEnum):
COMPOSIO_GOOGLE_DRIVE_CONNECTOR = "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"
COMPOSIO_GMAIL_CONNECTOR = "COMPOSIO_GMAIL_CONNECTOR"
COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"
LOCAL_FOLDER_CONNECTOR = "LOCAL_FOLDER_CONNECTOR"
class PodcastStatus(StrEnum):

View file

@ -2,6 +2,7 @@
import asyncio
from fastapi import APIRouter, Depends, Form, HTTPException, UploadFile
from pydantic import BaseModel as PydanticBaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
@ -11,6 +12,7 @@ from app.db import (
Document,
DocumentType,
DocumentVersion,
Folder,
Permission,
SearchSpace,
SearchSpaceMembership,
@ -1258,3 +1260,144 @@ async def restore_document_version(
"document_id": document_id,
"restored_version": version_number,
}
# ===== Local folder indexing endpoints =====
class FolderIndexRequest(PydanticBaseModel):
folder_path: str
folder_name: str
search_space_id: int
exclude_patterns: list[str] | None = None
file_extensions: list[str] | None = None
root_folder_id: int | None = None
enable_summary: bool = False
class FolderIndexFileRequest(PydanticBaseModel):
folder_path: str
folder_name: str
search_space_id: int
target_file_path: str
enable_summary: bool = False
@router.post("/documents/folder-index")
async def folder_index(
request: FolderIndexRequest,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Full-scan index of a local folder. Creates the root Folder row synchronously
and dispatches the heavy indexing work to a Celery task.
Returns the root_folder_id so the desktop can persist it.
"""
from app.config import config as app_config
if not app_config.is_self_hosted():
raise HTTPException(
status_code=400,
detail="Local folder indexing is only available in self-hosted mode",
)
await check_permission(
session,
user,
request.search_space_id,
Permission.DOCUMENTS_CREATE.value,
"You don't have permission to create documents in this search space",
)
root_folder_id = request.root_folder_id
if root_folder_id:
existing = (
await session.execute(
select(Folder).where(Folder.id == root_folder_id)
)
).scalar_one_or_none()
if not existing:
root_folder_id = None
if not root_folder_id:
root_folder = Folder(
name=request.folder_name,
search_space_id=request.search_space_id,
created_by_id=str(user.id),
position="a0",
)
session.add(root_folder)
await session.flush()
root_folder_id = root_folder.id
await session.commit()
from app.tasks.celery_tasks.document_tasks import index_local_folder_task
index_local_folder_task.delay(
search_space_id=request.search_space_id,
user_id=str(user.id),
folder_path=request.folder_path,
folder_name=request.folder_name,
exclude_patterns=request.exclude_patterns,
file_extensions=request.file_extensions,
root_folder_id=root_folder_id,
enable_summary=request.enable_summary,
)
return {
"message": "Folder indexing started",
"status": "processing",
"root_folder_id": root_folder_id,
}
@router.post("/documents/folder-index-file")
async def folder_index_file(
request: FolderIndexFileRequest,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Index a single file within a watched folder (chokidar trigger).
Validates that target_file_path is under folder_path.
"""
from app.config import config as app_config
if not app_config.is_self_hosted():
raise HTTPException(
status_code=400,
detail="Local folder indexing is only available in self-hosted mode",
)
await check_permission(
session,
user,
request.search_space_id,
Permission.DOCUMENTS_CREATE.value,
"You don't have permission to create documents in this search space",
)
from pathlib import Path
try:
Path(request.target_file_path).relative_to(request.folder_path)
except ValueError:
raise HTTPException(
status_code=400,
detail="target_file_path must be inside folder_path",
)
from app.tasks.celery_tasks.document_tasks import index_local_folder_task
index_local_folder_task.delay(
search_space_id=request.search_space_id,
user_id=str(user.id),
folder_path=request.folder_path,
folder_name=request.folder_name,
target_file_path=request.target_file_path,
enable_summary=request.enable_summary,
)
return {
"message": "File indexing started",
"status": "processing",
}

View file

@ -1170,24 +1170,6 @@ async def index_connector_content(
)
response_message = "Obsidian vault indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR:
from app.config import config as app_config
from app.tasks.celery_tasks.connector_tasks import index_local_folder_task
if not app_config.is_self_hosted():
raise HTTPException(
status_code=400,
detail="Local folder connector is only available in self-hosted mode",
)
logger.info(
f"Triggering local folder indexing for connector {connector_id} into search space {search_space_id}"
)
index_local_folder_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
)
response_message = "Local folder indexing started in the background."
elif (
connector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR
@ -1320,76 +1302,6 @@ async def index_connector_content(
) from e
class IndexFileRequest(BaseModel):
file_path: str = Field(..., description="Absolute path to the file to index")
@router.post(
"/search-source-connectors/{connector_id}/index-file",
response_model=dict[str, Any],
)
async def index_single_file(
connector_id: int,
body: IndexFileRequest,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Index a single file from a local folder connector (chokidar real-time trigger)."""
from app.config import config as app_config
from app.tasks.celery_tasks.connector_tasks import index_local_folder_task
if not app_config.is_self_hosted():
raise HTTPException(
status_code=400,
detail="Local folder connector is only available in self-hosted mode",
)
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
raise HTTPException(status_code=404, detail="Local folder connector not found")
await check_permission(session, user, connector.search_space_id, Permission.CONNECTORS_UPDATE.value)
folder_path = connector.config.get("folder_path", "")
# Security: resolve symlinks and verify the file is inside folder_path
try:
resolved_file = os.path.realpath(body.file_path)
resolved_folder = os.path.realpath(folder_path)
if not resolved_file.startswith(resolved_folder + os.sep) and resolved_file != resolved_folder:
raise HTTPException(
status_code=403,
detail="File path is outside the configured folder",
)
except (OSError, ValueError):
raise HTTPException(
status_code=403,
detail="Invalid file path",
)
index_local_folder_task.delay(
connector_id,
connector.search_space_id,
str(user.id),
None,
None,
target_file_path=resolved_file,
)
return {
"message": "Single file indexing started",
"connector_id": connector_id,
"file_path": body.file_path,
}
async def _update_connector_timestamp_by_id(session: AsyncSession, connector_id: int):
"""
Update the last_indexed_at timestamp for a connector by its ID.
@ -3166,62 +3078,6 @@ async def run_obsidian_indexing(
)
async def run_local_folder_indexing_with_new_session(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
target_file_path: str | None = None,
):
"""Wrapper to run local folder indexing with its own database session."""
logger.info(
f"Background task started: Indexing local folder connector {connector_id} into space {search_space_id}"
)
async with async_session_maker() as session:
await run_local_folder_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date,
target_file_path=target_file_path,
)
logger.info(f"Background task finished: Indexing local folder connector {connector_id}")
async def run_local_folder_indexing(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
target_file_path: str | None = None,
):
"""Background task to run local folder indexing."""
from app.tasks.connector_indexers import index_local_folder
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=lambda session, connector_id, search_space_id, user_id,
start_date, end_date, update_last_indexed, on_heartbeat_callback: index_local_folder(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
update_last_indexed=update_last_indexed,
on_heartbeat_callback=on_heartbeat_callback,
target_file_path=target_file_path,
),
update_timestamp_func=_update_connector_timestamp_by_id,
supports_heartbeat_callback=True,
)
async def run_composio_indexing_with_new_session(
connector_id: int,
search_space_id: int,

View file

@ -926,52 +926,6 @@ async def _index_obsidian_vault(
)
@celery_app.task(name="index_local_folder", bind=True)
def index_local_folder_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str = None,
end_date: str = None,
target_file_path: str = None,
):
"""Celery task to index a local folder."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_local_folder(
connector_id, search_space_id, user_id, start_date, end_date, target_file_path
)
)
finally:
loop.close()
async def _index_local_folder(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str = None,
end_date: str = None,
target_file_path: str = None,
):
"""Index local folder with new session."""
from app.routes.search_source_connectors_routes import (
run_local_folder_indexing,
)
async with get_celery_session_maker()() as session:
await run_local_folder_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date,
target_file_path=target_file_path,
)
@celery_app.task(name="index_composio_connector", bind=True)
def index_composio_connector_task(
self,

View file

@ -10,6 +10,7 @@ from app.config import config
from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.celery_tasks import get_celery_session_maker
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
from app.tasks.document_processors import (
add_extension_received_document,
add_youtube_video_document,
@ -1243,3 +1244,68 @@ async def _process_circleback_meeting(
heartbeat_task.cancel()
if notification:
_stop_heartbeat(notification.id)
# ===== Local folder indexing task =====
@celery_app.task(name="index_local_folder", bind=True)
def index_local_folder_task(
self,
search_space_id: int,
user_id: str,
folder_path: str,
folder_name: str,
exclude_patterns: list[str] | None = None,
file_extensions: list[str] | None = None,
root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_path: str | None = None,
):
"""Celery task to index a local folder. Config is passed directly — no connector row."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_local_folder_async(
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
exclude_patterns=exclude_patterns,
file_extensions=file_extensions,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
target_file_path=target_file_path,
)
)
finally:
loop.close()
async def _index_local_folder_async(
search_space_id: int,
user_id: str,
folder_path: str,
folder_name: str,
exclude_patterns: list[str] | None = None,
file_extensions: list[str] | None = None,
root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_path: str | None = None,
):
"""Run local folder indexing with a fresh DB session."""
async with get_celery_session_maker()() as session:
await index_local_folder(
session=session,
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
exclude_patterns=exclude_patterns,
file_extensions=file_extensions,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
target_file_path=target_file_path,
)

View file

@ -44,7 +44,6 @@ from .jira_indexer import index_jira_issues
from .linear_indexer import index_linear_issues
# Documentation and knowledge management
from .local_folder_indexer import index_local_folder
from .luma_indexer import index_luma_events
from .notion_indexer import index_notion_pages
from .obsidian_indexer import index_obsidian_vault
@ -75,5 +74,4 @@ __all__ = [ # noqa: RUF022
# Communication platforms
"index_slack_messages",
"index_google_gmail_messages",
"index_local_folder",
]

View file

@ -1,5 +1,5 @@
"""
Local folder connector indexer.
Local folder indexer.
Indexes files from a local folder on disk. Supports:
- Full-scan mode (startup reconciliation / manual trigger)
@ -8,7 +8,9 @@ Indexes files from a local folder on disk. Supports:
- Document versioning via create_version_snapshot
- ETL-based file parsing for binary formats (PDF, DOCX, images, audio, etc.)
Electron-only: all change detection is driven by chokidar in the desktop app.
Desktop-only: all change detection is driven by chokidar in the desktop app.
Config (folder_path, exclude_patterns, etc.) is passed in from the caller
no connector row is read.
"""
import os
@ -17,10 +19,9 @@ from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from pathlib import Path
from sqlalchemy import delete, select
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.config import config
from app.db import (
@ -28,7 +29,6 @@ from app.db import (
DocumentStatus,
DocumentType,
Folder,
SearchSourceConnectorType,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
@ -45,11 +45,9 @@ from .base import (
build_document_metadata_string,
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
PLAINTEXT_EXTENSIONS = frozenset({
@ -131,12 +129,10 @@ def scan_folder(
for dirpath, dirnames, filenames in os.walk(root):
rel_dir = Path(dirpath).relative_to(root)
# Prune excluded directories in-place so os.walk skips them
dirnames[:] = [
d for d in dirnames if d not in exclude_patterns
]
# Check if the current directory itself is excluded
if any(part in exclude_patterns for part in rel_dir.parts):
continue
@ -232,20 +228,18 @@ async def _mirror_folder_structure(
folder_name: str,
search_space_id: int,
user_id: str,
connector_config: dict,
connector,
root_folder_id: int | None = None,
exclude_patterns: list[str] | None = None,
) -> dict[str, int]:
) -> tuple[dict[str, int], int]:
"""Mirror the local filesystem directory structure into DB Folder rows.
Returns a mapping of relative_dir_path -> folder_id.
The empty string key ("") maps to the root folder.
Returns (mapping, root_folder_id) where mapping is
relative_dir_path -> folder_id. The empty string key maps to the root folder.
"""
root = Path(folder_path)
if exclude_patterns is None:
exclude_patterns = []
# Collect all subdirectory paths relative to root
subdirs: list[str] = []
for dirpath, dirnames, _ in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in exclude_patterns]
@ -256,13 +250,10 @@ async def _mirror_folder_structure(
if rel_str:
subdirs.append(rel_str)
# Sort by depth so parents are created before children
subdirs.sort(key=lambda p: p.count(os.sep))
mapping: dict[str, int] = {}
# Get or create root folder
root_folder_id = connector_config.get("root_folder_id")
if root_folder_id:
existing = (
await session.execute(
@ -284,12 +275,8 @@ async def _mirror_folder_structure(
session.add(root_folder)
await session.flush()
mapping[""] = root_folder.id
# Persist root_folder_id into connector config
connector_config["root_folder_id"] = root_folder.id
connector.config = {**connector.config, "root_folder_id": root_folder.id}
flag_modified(connector, "config")
root_folder_id = root_folder.id
# Create/reuse subdirectory Folder rows
for rel_dir in subdirs:
dir_parts = Path(rel_dir).parts
dir_name = dir_parts[-1]
@ -322,7 +309,7 @@ async def _mirror_folder_structure(
mapping[rel_dir] = new_folder.id
await session.flush()
return mapping
return mapping, root_folder_id
async def _cleanup_empty_folders(
@ -332,16 +319,11 @@ async def _cleanup_empty_folders(
existing_dirs_on_disk: set[str],
folder_mapping: dict[str, int],
) -> None:
"""Delete Folder rows that are empty (no docs, no children) and no longer on disk.
"""Delete Folder rows that are empty (no docs, no children) and no longer on disk."""
from sqlalchemy import delete as sa_delete
Queries ALL folders under this search space (not just the current mapping)
so that stale folders from previous syncs are also cleaned up.
"""
# Build a reverse mapping from folder_id → rel_dir for known dirs
id_to_rel: dict[int, str] = {fid: rel for rel, fid in folder_mapping.items() if rel}
# Also find any folders in the DB that are children of the root but NOT
# in the current mapping (stale from a previous sync).
all_folders = (
await session.execute(
select(Folder).where(
@ -351,7 +333,6 @@ async def _cleanup_empty_folders(
)
).scalars().all()
# Build candidates: folders not on disk that we might delete
candidates: list[Folder] = []
for folder in all_folders:
rel = id_to_rel.get(folder.id)
@ -359,8 +340,6 @@ async def _cleanup_empty_folders(
continue
candidates.append(folder)
# Sort deepest first (by name depth heuristic — folders with no children first)
# Repeat until no more deletions happen (cascading empty parents)
changed = True
while changed:
changed = False
@ -384,57 +363,46 @@ async def _cleanup_empty_folders(
remaining.append(folder)
continue
await session.execute(delete(Folder).where(Folder.id == folder.id))
await session.execute(sa_delete(Folder).where(Folder.id == folder.id))
changed = True
candidates = remaining
async def index_local_folder(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
folder_path: str,
folder_name: str,
exclude_patterns: list[str] | None = None,
file_extensions: list[str] | None = None,
root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_path: str | None = None,
) -> tuple[int, int, str | None]:
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, int | None, str | None]:
"""Index files from a local folder.
Supports two modes:
- Full scan (target_file_path=None): walks entire folder, handles new/changed/deleted files.
- Single-file (target_file_path set): processes only that file.
Returns (indexed_count, skipped_count, error_or_warning_message).
Returns (indexed_count, skipped_count, root_folder_id, error_or_warning_message).
"""
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="local_folder_indexing",
source="connector_indexing_task",
message=f"Starting local folder indexing for connector {connector_id}",
source="local_folder_indexing_task",
message=f"Starting local folder indexing for {folder_name}",
metadata={
"connector_id": connector_id,
"folder_path": folder_path,
"user_id": str(user_id),
"target_file_path": target_file_path,
},
)
try:
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR
)
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector {connector_id} not found",
"Connector not found",
{},
)
return 0, 0, f"Connector {connector_id} not found"
folder_path = connector.config.get("folder_path")
if not folder_path or not os.path.exists(folder_path):
await task_logger.log_task_failure(
log_entry,
@ -442,59 +410,54 @@ async def index_local_folder(
"Folder not found",
{},
)
return 0, 0, f"Folder path missing or does not exist: {folder_path}"
return 0, 0, root_folder_id, f"Folder path missing or does not exist: {folder_path}"
folder_name = connector.config.get("folder_name") or os.path.basename(folder_path)
exclude_patterns = connector.config.get("exclude_patterns", DEFAULT_EXCLUDE_PATTERNS)
file_extensions = connector.config.get("file_extensions") # None = all
if exclude_patterns is None:
exclude_patterns = DEFAULT_EXCLUDE_PATTERNS
# ====================================================================
# SINGLE-FILE MODE
# ====================================================================
if target_file_path:
return await _index_single_file(
indexed, skipped, err = await _index_single_file(
session=session,
connector=connector,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
target_file_path=target_file_path,
enable_summary=enable_summary,
task_logger=task_logger,
log_entry=log_entry,
update_last_indexed=update_last_indexed,
)
return indexed, skipped, root_folder_id, err
# ====================================================================
# FULL-SCAN MODE
# ====================================================================
# Phase 0: Mirror folder structure
await task_logger.log_task_progress(
log_entry, "Mirroring folder structure", {"stage": "folder_mirror"}
)
folder_mapping = await _mirror_folder_structure(
folder_mapping, root_folder_id = await _mirror_folder_structure(
session=session,
folder_path=folder_path,
folder_name=folder_name,
search_space_id=search_space_id,
user_id=user_id,
connector_config=connector.config,
connector=connector,
root_folder_id=root_folder_id,
exclude_patterns=exclude_patterns,
)
await session.flush()
# Scan files on disk
try:
files = scan_folder(folder_path, file_extensions, exclude_patterns)
except Exception as e:
await task_logger.log_task_failure(
log_entry, f"Failed to scan folder: {e}", "Scan error", {}
)
return 0, 0, f"Failed to scan folder: {e}"
return 0, 0, root_folder_id, f"Failed to scan folder: {e}"
logger.info(f"Found {len(files)} files in folder")
@ -530,7 +493,6 @@ async def index_local_folder(
)
if existing_document:
# Check mtime first (cheap)
stored_mtime = (existing_document.document_metadata or {}).get("mtime")
current_mtime = file_info["modified_at"].timestamp()
@ -542,7 +504,6 @@ async def index_local_folder(
skipped_count += 1
continue
# mtime differs — read file and check content hash
try:
content, content_hash = await _compute_file_content_hash(
file_path_abs, file_info["relative_path"], search_space_id
@ -553,7 +514,6 @@ async def index_local_folder(
continue
if existing_document.content_hash == content_hash:
# Content same, just update mtime in metadata
meta = dict(existing_document.document_metadata or {})
meta["mtime"] = current_mtime
existing_document.document_metadata = meta
@ -564,7 +524,6 @@ async def index_local_folder(
skipped_count += 1
continue
# Content actually changed — snapshot version, queue for re-index
await create_version_snapshot(session, existing_document)
files_to_process.append(
@ -581,7 +540,6 @@ async def index_local_folder(
)
continue
# New document — read content
try:
content, content_hash = await _compute_file_content_hash(
file_path_abs, file_info["relative_path"], search_space_id
@ -595,7 +553,6 @@ async def index_local_folder(
skipped_count += 1
continue
# Check for duplicate content from another connector
with session.no_autoflush:
dup = await check_duplicate_document_by_hash(session, content_hash)
if dup:
@ -603,7 +560,6 @@ async def index_local_folder(
skipped_count += 1
continue
# Determine folder_id for this file
parent_dir = str(Path(relative_path).parent)
if parent_dir == ".":
parent_dir = ""
@ -616,17 +572,16 @@ async def index_local_folder(
document_metadata={
"folder_name": folder_name,
"file_path": relative_path,
"connector_id": connector_id,
"mtime": file_info["modified_at"].timestamp(),
},
content="Pending...",
content_hash=unique_identifier_hash, # Temp unique — updated in phase 2
content_hash=unique_identifier_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=None,
status=DocumentStatus.pending(),
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
connector_id=None,
folder_id=folder_id,
)
session.add(document)
@ -655,16 +610,17 @@ async def index_local_folder(
# ================================================================
# PHASE 1.5: Delete documents no longer on disk
# ================================================================
all_connector_docs = (
all_folder_docs = (
await session.execute(
select(Document).where(
Document.connector_id == connector_id,
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
Document.search_space_id == search_space_id,
Document.folder_id.in_(list(folder_mapping.values())),
)
)
).scalars().all()
for doc in all_connector_docs:
for doc in all_folder_docs:
if doc.unique_identifier_hash not in seen_unique_hashes:
await session.delete(doc)
@ -709,7 +665,7 @@ async def index_local_folder(
document_string = build_document_metadata_string(metadata_sections)
summary_content = ""
if long_context_llm and connector.enable_summary:
if long_context_llm and enable_summary:
doc_meta = {
"folder_name": folder_name,
"file_path": relative_path,
@ -721,7 +677,6 @@ async def index_local_folder(
embedding = embed_text(document_string)
chunks = await create_document_chunks(document_string)
# Determine folder_id
parent_dir = str(Path(relative_path).parent)
if parent_dir == ".":
parent_dir = ""
@ -735,7 +690,6 @@ async def index_local_folder(
document.document_metadata = {
"folder_name": folder_name,
"file_path": relative_path,
"connector_id": connector_id,
"summary": summary_content,
"mtime": file_info["modified_at"].timestamp(),
}
@ -782,8 +736,6 @@ async def index_local_folder(
session, root_fid, search_space_id, existing_dirs, folder_mapping
)
await update_connector_last_indexed(session, connector, update_last_indexed)
try:
await session.commit()
except Exception as e:
@ -802,7 +754,7 @@ async def index_local_folder(
await task_logger.log_task_success(
log_entry,
f"Completed local folder indexing for connector {connector_id}",
f"Completed local folder indexing for {folder_name}",
{
"indexed": indexed_count,
"skipped": skipped_count,
@ -811,7 +763,7 @@ async def index_local_folder(
},
)
return indexed_count, skipped_count, warning_message
return indexed_count, skipped_count, root_folder_id, warning_message
except SQLAlchemyError as e:
logger.exception(f"Database error during local folder indexing: {e}")
@ -819,34 +771,31 @@ async def index_local_folder(
await task_logger.log_task_failure(
log_entry, f"DB error: {e}", "Database error", {}
)
return 0, 0, f"Database error: {e}"
return 0, 0, root_folder_id, f"Database error: {e}"
except Exception as e:
logger.exception(f"Error during local folder indexing: {e}")
await task_logger.log_task_failure(
log_entry, f"Error: {e}", "Unexpected error", {}
)
return 0, 0, str(e)
return 0, 0, root_folder_id, str(e)
async def _index_single_file(
session: AsyncSession,
connector,
connector_id: int,
search_space_id: int,
user_id: str,
folder_path: str,
folder_name: str,
target_file_path: str,
enable_summary: bool,
task_logger,
log_entry,
update_last_indexed: bool = True,
) -> tuple[int, int, str | None]:
"""Process a single file (chokidar real-time trigger)."""
try:
full_path = Path(target_file_path)
if not full_path.exists():
# File was deleted — find and remove the document
rel = str(full_path.relative_to(folder_path))
unique_id = f"{folder_name}:{rel}"
uid_hash = generate_unique_identifier_hash(
@ -880,7 +829,6 @@ async def _index_single_file(
if existing:
if existing.content_hash == content_hash:
# Update mtime
mtime = full_path.stat().st_mtime
meta = dict(existing.document_metadata or {})
meta["mtime"] = mtime
@ -888,10 +836,8 @@ async def _index_single_file(
await session.commit()
return 0, 1, None
# Content changed — snapshot + re-index
await create_version_snapshot(session, existing)
# Get LLM
long_context_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
@ -906,7 +852,7 @@ async def _index_single_file(
document_string = build_document_metadata_string(metadata_sections)
summary_content = ""
if long_context_llm and connector.enable_summary:
if long_context_llm and enable_summary:
summary_content, _ = await generate_document_summary(
document_string, long_context_llm, {"folder_name": folder_name, "file_path": rel_path}
)
@ -917,7 +863,6 @@ async def _index_single_file(
doc_metadata = {
"folder_name": folder_name,
"file_path": rel_path,
"connector_id": connector_id,
"summary": summary_content,
"mtime": mtime,
}
@ -946,16 +891,14 @@ async def _index_single_file(
status=DocumentStatus.ready(),
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
connector_id=None,
)
session.add(document)
# Set chunks
await session.flush()
for chunk in chunks:
chunk.document_id = document.id
session.add_all(chunks)
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
await task_logger.log_task_success(

View file

@ -168,22 +168,3 @@ def make_connector_document(db_connector, db_user):
return _make
@pytest_asyncio.fixture
async def db_local_folder_connector(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace, tmp_path
) -> SearchSourceConnector:
connector = SearchSourceConnector(
name="Test Local Folder",
connector_type=SearchSourceConnectorType.LOCAL_FOLDER_CONNECTOR,
config={
"folder_path": str(tmp_path),
"folder_name": "test-folder",
"exclude_patterns": [],
"file_extensions": None,
},
search_space_id=db_search_space.id,
user_id=db_user.id,
)
db_session.add(connector)
await db_session.flush()
return connector

View file

@ -14,7 +14,6 @@ from app.db import (
DocumentType,
DocumentVersion,
Folder,
SearchSourceConnector,
SearchSpace,
User,
)
@ -72,7 +71,6 @@ class TestFullIndexer:
async def test_i1_new_file_indexed(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
@ -82,11 +80,12 @@ class TestFullIndexer:
(tmp_path / "note.md").write_text("# Hello World\n\nContent here.")
count, skipped, err = await index_local_folder(
count, skipped, root_folder_id, err = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
assert err is None
@ -95,7 +94,8 @@ class TestFullIndexer:
docs = (
await db_session.execute(
select(Document).where(
Document.connector_id == db_local_folder_connector.id
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
Document.search_space_id == db_search_space.id,
)
)
).scalars().all()
@ -112,7 +112,6 @@ class TestFullIndexer:
async def test_i2_unchanged_skipped(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
@ -122,27 +121,31 @@ class TestFullIndexer:
(tmp_path / "note.md").write_text("# Hello\n\nSame content.")
count1, _, _ = await index_local_folder(
count1, _, root_folder_id, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
assert count1 == 1
# Second run — unchanged
count2, _, _ = await index_local_folder(
# Second run — unchanged, pass root_folder_id from first run
count2, _, _, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
root_folder_id=root_folder_id,
)
assert count2 == 0
total = (
await db_session.execute(
select(func.count()).select_from(Document).where(
Document.connector_id == db_local_folder_connector.id
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
Document.search_space_id == db_search_space.id,
)
)
).scalar_one()
@ -157,7 +160,6 @@ class TestFullIndexer:
async def test_i3_changed_reindexed(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
@ -168,11 +170,12 @@ class TestFullIndexer:
f = tmp_path / "note.md"
f.write_text("# Version 1\n\nOriginal.")
await index_local_folder(
_, _, root_folder_id, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
# Modify
@ -180,11 +183,13 @@ class TestFullIndexer:
# Touch mtime to ensure it's detected as different
os.utime(f, (f.stat().st_atime + 10, f.stat().st_mtime + 10))
count, _, _ = await index_local_folder(
count, _, _, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
root_folder_id=root_folder_id,
)
assert count == 1
@ -192,7 +197,8 @@ class TestFullIndexer:
versions = (
await db_session.execute(
select(DocumentVersion).join(Document).where(
Document.connector_id == db_local_folder_connector.id
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
Document.search_space_id == db_search_space.id,
)
)
).scalars().all()
@ -207,7 +213,6 @@ class TestFullIndexer:
async def test_i4_deleted_removed(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
@ -218,17 +223,19 @@ class TestFullIndexer:
f = tmp_path / "to_delete.md"
f.write_text("# Delete me")
await index_local_folder(
_, _, root_folder_id, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
docs_before = (
await db_session.execute(
select(func.count()).select_from(Document).where(
Document.connector_id == db_local_folder_connector.id
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
Document.search_space_id == db_search_space.id,
)
)
).scalar_one()
@ -238,15 +245,18 @@ class TestFullIndexer:
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
root_folder_id=root_folder_id,
)
docs_after = (
await db_session.execute(
select(func.count()).select_from(Document).where(
Document.connector_id == db_local_folder_connector.id
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
Document.search_space_id == db_search_space.id,
)
)
).scalar_one()
@ -261,7 +271,6 @@ class TestFullIndexer:
async def test_i5_single_file_mode(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
@ -273,11 +282,12 @@ class TestFullIndexer:
(tmp_path / "b.md").write_text("File B")
(tmp_path / "c.md").write_text("File C")
count, _, _ = await index_local_folder(
count, _, _, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
target_file_path=str(tmp_path / "b.md"),
)
assert count == 1
@ -285,12 +295,13 @@ class TestFullIndexer:
docs = (
await db_session.execute(
select(Document).where(
Document.connector_id == db_local_folder_connector.id
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
Document.search_space_id == db_search_space.id,
)
)
).scalars().all()
assert len(docs) == 1
assert docs[0].title == "b"
assert docs[0].title == "b.md"
# ====================================================================
@ -309,30 +320,27 @@ class TestFolderMirroring:
async def test_f1_root_folder_created(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""F1: First sync creates a root Folder and stores root_folder_id."""
"""F1: First sync creates a root Folder and returns root_folder_id."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
(tmp_path / "root.md").write_text("Root file")
await index_local_folder(
_, _, root_folder_id, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
# Refresh connector
await db_session.refresh(db_local_folder_connector)
root_id = db_local_folder_connector.config.get("root_folder_id")
assert root_id is not None
assert root_folder_id is not None
root_folder = (
await db_session.execute(select(Folder).where(Folder.id == root_id))
await db_session.execute(select(Folder).where(Folder.id == root_folder_id))
).scalar_one()
assert root_folder.name == "test-folder"
@ -345,7 +353,6 @@ class TestFolderMirroring:
async def test_f2_nested_folder_rows(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
@ -362,9 +369,10 @@ class TestFolderMirroring:
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
folders = (
@ -394,7 +402,6 @@ class TestFolderMirroring:
async def test_f3_resync_reuses_folders(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
@ -406,11 +413,12 @@ class TestFolderMirroring:
sub.mkdir()
(sub / "file.md").write_text("content")
await index_local_folder(
_, _, root_folder_id, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
folders_before = (
@ -420,12 +428,14 @@ class TestFolderMirroring:
).scalars().all()
ids_before = {f.id for f in folders_before}
# Re-sync
# Re-sync with root_folder_id from first run
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
root_folder_id=root_folder_id,
)
folders_after = (
@ -446,7 +456,6 @@ class TestFolderMirroring:
async def test_f4_folder_id_assigned(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
@ -459,17 +468,19 @@ class TestFolderMirroring:
(daily / "today.md").write_text("today note")
(tmp_path / "root.md").write_text("root note")
await index_local_folder(
_, _, root_folder_id, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
docs = (
await db_session.execute(
select(Document).where(
Document.connector_id == db_local_folder_connector.id
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
Document.search_space_id == db_search_space.id,
)
)
).scalars().all()
@ -486,9 +497,7 @@ class TestFolderMirroring:
assert today_doc.folder_id == daily_folder.id
# Root doc should be in the root folder
await db_session.refresh(db_local_folder_connector)
root_fid = db_local_folder_connector.config.get("root_folder_id")
assert root_doc.folder_id == root_fid
assert root_doc.folder_id == root_folder_id
@pytest.mark.usefixtures(
"patched_self_hosted",
@ -499,7 +508,6 @@ class TestFolderMirroring:
async def test_f5_empty_folder_cleanup(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
@ -515,11 +523,12 @@ class TestFolderMirroring:
(daily / "today.md").write_text("today")
(weekly / "review.md").write_text("review")
await index_local_folder(
_, _, root_folder_id, _ = await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
# Verify weekly folder exists
@ -535,9 +544,11 @@ class TestFolderMirroring:
await index_local_folder(
session=db_session,
connector_id=db_local_folder_connector.id,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
root_folder_id=root_folder_id,
)
# weekly Folder should be gone (empty, dir removed)
@ -570,7 +581,6 @@ class TestPipelineIntegration:
async def test_p1_local_folder_file_through_pipeline(
self,
db_session: AsyncSession,
db_local_folder_connector: SearchSourceConnector,
db_user: User,
db_search_space: SearchSpace,
mocker,
@ -585,7 +595,7 @@ class TestPipelineIntegration:
unique_id="test-folder:test.md",
document_type=DocumentType.LOCAL_FOLDER_FILE,
search_space_id=db_search_space.id,
connector_id=db_local_folder_connector.id,
connector_id=None,
created_by_id=str(db_user.id),
)