feat: refactor folder indexing to support batch processing of multiple files, enhancing performance and error handling

This commit is contained in:
Anish Sarkar 2026-04-03 10:02:36 +05:30
parent e2ba509314
commit 1fa8e1cc83
6 changed files with 293 additions and 80 deletions

View file

@ -1305,11 +1305,11 @@ class FolderIndexRequest(PydanticBaseModel):
enable_summary: bool = False
class FolderIndexFileRequest(PydanticBaseModel):
class FolderIndexFilesRequest(PydanticBaseModel):
folder_path: str
folder_name: str
search_space_id: int
target_file_path: str
target_file_paths: list[str]
root_folder_id: int | None = None
enable_summary: bool = False
@ -1393,14 +1393,15 @@ async def folder_index(
}
@router.post("/documents/folder-index-file")
async def folder_index_file(
request: FolderIndexFileRequest,
@router.post("/documents/folder-index-files")
async def folder_index_files(
request: FolderIndexFilesRequest,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Index a single file within a watched folder (chokidar trigger).
Validates that target_file_path is under folder_path.
"""Index multiple files within a watched folder (batched chokidar trigger).
Validates that all target_file_paths are under folder_path.
Dispatches a single Celery task that processes them in parallel.
"""
from app.config import config as app_config
@ -1410,6 +1411,9 @@ async def folder_index_file(
detail="Local folder indexing is only available in self-hosted mode",
)
if not request.target_file_paths:
raise HTTPException(status_code=400, detail="target_file_paths must not be empty")
await check_permission(
session,
user,
@ -1420,13 +1424,14 @@ async def folder_index_file(
from pathlib import Path
try:
Path(request.target_file_path).relative_to(request.folder_path)
except ValueError:
raise HTTPException(
status_code=400,
detail="target_file_path must be inside folder_path",
)
for fp in request.target_file_paths:
try:
Path(fp).relative_to(request.folder_path)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"target_file_path {fp} must be inside folder_path",
)
from app.tasks.celery_tasks.document_tasks import index_local_folder_task
@ -1435,14 +1440,15 @@ async def folder_index_file(
user_id=str(user.id),
folder_path=request.folder_path,
folder_name=request.folder_name,
target_file_path=request.target_file_path,
target_file_paths=request.target_file_paths,
root_folder_id=request.root_folder_id,
enable_summary=request.enable_summary,
)
return {
"message": "File indexing started",
"message": f"Batch indexing started for {len(request.target_file_paths)} file(s)",
"status": "processing",
"file_count": len(request.target_file_paths),
}

View file

@ -1275,7 +1275,7 @@ def index_local_folder_task(
file_extensions: list[str] | None = None,
root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_path: str | None = None,
target_file_paths: list[str] | None = None,
):
"""Celery task to index a local folder. Config is passed directly — no connector row."""
loop = asyncio.new_event_loop()
@ -1292,7 +1292,7 @@ def index_local_folder_task(
file_extensions=file_extensions,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
target_file_path=target_file_path,
target_file_paths=target_file_paths,
)
)
finally:
@ -1308,19 +1308,103 @@ async def _index_local_folder_async(
file_extensions: list[str] | None = None,
root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_path: str | None = None,
target_file_paths: list[str] | None = None,
):
"""Run local folder indexing with a fresh DB session."""
"""Run local folder indexing with notification + heartbeat."""
is_batch = bool(target_file_paths)
is_full_scan = not target_file_paths
file_count = len(target_file_paths) if target_file_paths else None
if is_batch:
doc_name = f"{folder_name} ({file_count} file{'s' if file_count != 1 else ''})"
else:
doc_name = folder_name
notification = None
heartbeat_task = None
async with get_celery_session_maker()() as session:
await index_local_folder(
session=session,
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
exclude_patterns=exclude_patterns,
file_extensions=file_extensions,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
target_file_path=target_file_path,
)
try:
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="LOCAL_FOLDER_FILE",
document_name=doc_name,
search_space_id=search_space_id,
)
)
_start_heartbeat(notification.id)
heartbeat_task = asyncio.create_task(
_run_heartbeat_loop(notification.id)
)
except Exception:
logger.warning(
"Failed to create notification for local folder indexing",
exc_info=True,
)
async def _heartbeat_progress(completed_count: int) -> None:
"""Refresh heartbeat and optionally update notification progress."""
if notification:
try:
await NotificationService.document_processing.notify_processing_progress(
session=session,
notification=notification,
stage="indexing",
stage_message=f"Syncing files ({completed_count}/{file_count or '?'})",
)
except Exception:
pass
try:
indexed, skipped_or_failed, _rfid, err = await index_local_folder(
session=session,
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
exclude_patterns=exclude_patterns,
file_extensions=file_extensions,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
target_file_paths=target_file_paths,
on_heartbeat_callback=_heartbeat_progress if (is_batch or is_full_scan) else None,
)
if notification:
try:
if err:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=err,
)
else:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
)
except Exception:
logger.warning(
"Failed to update notification after local folder indexing",
exc_info=True,
)
except Exception as e:
logger.exception(f"Local folder indexing failed: {e}")
if notification:
try:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:200],
)
except Exception:
pass
raise
finally:
if heartbeat_task:
heartbeat_task.cancel()
if notification:
_stop_heartbeat(notification.id)

View file

@ -3,7 +3,7 @@ Local folder indexer.
Indexes files from a local folder on disk. Supports:
- Full-scan mode (startup reconciliation / manual trigger)
- Single-file mode (chokidar real-time trigger)
- Batch mode (chokidar real-time trigger, 1..N files)
- Filesystem folder structure mirroring into DB Folder rows
- Document versioning via create_version_snapshot
- ETL-based file parsing for binary formats (PDF, DOCX, images, audio, etc.)
@ -13,6 +13,7 @@ Config (folder_path, exclude_patterns, etc.) is passed in from the caller —
no connector row is read.
"""
import asyncio
import os
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
@ -34,6 +35,7 @@ from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.celery_tasks import get_celery_session_maker
from app.utils.document_versioning import create_version_snapshot
from .base import (
@ -497,14 +499,15 @@ async def index_local_folder(
file_extensions: list[str] | None = None,
root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_path: str | None = None,
target_file_paths: list[str] | None = None,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, int | None, str | None]:
"""Index files from a local folder.
Supports two modes:
- Full scan (target_file_path=None): walks entire folder, handles new/changed/deleted files.
- Single-file (target_file_path set): processes only that file.
- Batch (target_file_paths set): processes 1..N files.
Single-file uses the caller's session; multi-file fans out with per-file sessions.
- Full scan (no target paths): walks entire folder, handles new/changed/deleted files.
Returns (indexed_count, skipped_count, root_folder_id, error_or_warning_message).
"""
@ -517,7 +520,7 @@ async def index_local_folder(
metadata={
"folder_path": folder_path,
"user_id": str(user_id),
"target_file_path": target_file_path,
"target_file_paths_count": len(target_file_paths) if target_file_paths else None,
},
)
@ -535,22 +538,47 @@ async def index_local_folder(
exclude_patterns = DEFAULT_EXCLUDE_PATTERNS
# ====================================================================
# SINGLE-FILE MODE
# BATCH MODE (1..N files)
# ====================================================================
if target_file_path:
indexed, skipped, err = await _index_single_file(
session=session,
if target_file_paths:
if len(target_file_paths) == 1:
indexed, skipped, err = await _index_single_file(
session=session,
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
target_file_path=target_file_paths[0],
enable_summary=enable_summary,
root_folder_id=root_folder_id,
task_logger=task_logger,
log_entry=log_entry,
)
return indexed, skipped, root_folder_id, err
indexed, failed, err = await _index_batch_files(
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
target_file_path=target_file_path,
target_file_paths=target_file_paths,
enable_summary=enable_summary,
root_folder_id=root_folder_id,
task_logger=task_logger,
log_entry=log_entry,
on_progress_callback=on_heartbeat_callback,
)
return indexed, skipped, root_folder_id, err
if err:
await task_logger.log_task_success(
log_entry,
f"Batch indexing: {indexed} indexed, {failed} failed",
{"indexed": indexed, "failed": failed},
)
else:
await task_logger.log_task_success(
log_entry,
f"Batch indexing complete: {indexed} indexed",
{"indexed": indexed, "failed": failed},
)
return indexed, failed, root_folder_id, err
# ====================================================================
# FULL-SCAN MODE
@ -822,6 +850,84 @@ async def index_local_folder(
return 0, 0, root_folder_id, str(e)
BATCH_CONCURRENCY = 5
async def _index_batch_files(
search_space_id: int,
user_id: str,
folder_path: str,
folder_name: str,
target_file_paths: list[str],
enable_summary: bool,
root_folder_id: int | None,
on_progress_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, str | None]:
"""Process multiple files in parallel with bounded concurrency.
Each file gets its own DB session so they can run concurrently.
Returns (indexed_count, failed_count, error_summary_or_none).
"""
semaphore = asyncio.Semaphore(BATCH_CONCURRENCY)
indexed = 0
failed = 0
errors: list[str] = []
lock = asyncio.Lock()
completed = 0
async def process_one(file_path: str) -> None:
nonlocal indexed, failed, completed
async with semaphore:
try:
async with get_celery_session_maker()() as file_session:
task_logger = TaskLoggingService(file_session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="local_folder_indexing",
source="local_folder_batch_indexing",
message=f"Batch: indexing {Path(file_path).name}",
metadata={"file_path": file_path},
)
ix, _sk, err = await _index_single_file(
session=file_session,
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
target_file_path=file_path,
enable_summary=enable_summary,
root_folder_id=root_folder_id,
task_logger=task_logger,
log_entry=log_entry,
)
async with lock:
indexed += ix
if err:
failed += 1
errors.append(f"{Path(file_path).name}: {err}")
completed += 1
if on_progress_callback and completed % BATCH_CONCURRENCY == 0:
await on_progress_callback(completed)
except Exception as exc:
logger.exception(f"Batch: error processing {file_path}: {exc}")
async with lock:
failed += 1
completed += 1
errors.append(f"{Path(file_path).name}: {exc}")
await asyncio.gather(*[process_one(fp) for fp in target_file_paths])
if on_progress_callback:
await on_progress_callback(completed)
error_summary = None
if errors:
error_summary = f"{failed} file(s) failed: " + "; ".join(errors[:5])
if len(errors) > 5:
error_summary += f" ... and {len(errors) - 5} more"
return indexed, failed, error_summary
async def _index_single_file(
session: AsyncSession,
search_space_id: int,

View file

@ -215,7 +215,7 @@ class TestFullIndexer:
db_search_space: SearchSpace,
tmp_path: Path,
):
"""I5: Single-file mode only processes the specified file."""
"""I5: Batch mode with a single file only processes that file."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
(tmp_path / "a.md").write_text("File A")
@ -228,7 +228,7 @@ class TestFullIndexer:
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
target_file_path=str(tmp_path / "b.md"),
target_file_paths=[str(tmp_path / "b.md")],
)
assert count == 1
@ -507,7 +507,7 @@ class TestFolderMirroring:
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
target_file_path=str(sub / "new.md"),
target_file_paths=[str(sub / "new.md")],
root_folder_id=root_folder_id,
)
assert count == 1
@ -546,7 +546,7 @@ class TestFolderMirroring:
db_search_space: SearchSpace,
tmp_path: Path,
):
"""F7: Deleting the only file in a subfolder via single-file mode removes empty Folder rows."""
"""F7: Deleting the only file in a subfolder via batch mode removes empty Folder rows."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
sub = tmp_path / "notes" / "ephemeral"
@ -578,7 +578,7 @@ class TestFolderMirroring:
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
target_file_path=str(target),
target_file_paths=[str(target)],
root_folder_id=root_folder_id,
)

View file

@ -16,65 +16,82 @@ interface FileChangedEvent {
}
const DEBOUNCE_MS = 2000;
interface QueueItem {
event: FileChangedEvent;
interface BatchItem {
folderPath: string;
folderName: string;
searchSpaceId: number;
rootFolderId: number | null;
filePaths: string[];
ackIds: string[];
}
export function useFolderSync() {
const queueRef = useRef<QueueItem[]>([]);
const queueRef = useRef<BatchItem[]>([]);
const processingRef = useRef(false);
const debounceTimers = useRef<Map<string, ReturnType<typeof setTimeout>>>(new Map());
const pendingByKey = useRef<Map<string, QueueItem>>(new Map());
const pendingByFolder = useRef<Map<string, BatchItem>>(new Map());
const isMountedRef = useRef(false);
async function processQueue() {
if (processingRef.current) return;
processingRef.current = true;
while (queueRef.current.length > 0) {
const item = queueRef.current.shift()!;
const batch = queueRef.current.shift()!;
try {
await documentsApiService.folderIndexFile(item.event.searchSpaceId, {
folder_path: item.event.folderPath,
folder_name: item.event.folderName,
search_space_id: item.event.searchSpaceId,
target_file_path: item.event.fullPath,
root_folder_id: item.event.rootFolderId,
await documentsApiService.folderIndexFiles(batch.searchSpaceId, {
folder_path: batch.folderPath,
folder_name: batch.folderName,
search_space_id: batch.searchSpaceId,
target_file_paths: batch.filePaths,
root_folder_id: batch.rootFolderId,
});
const api = typeof window !== "undefined" ? window.electronAPI : null;
if (api?.acknowledgeFileEvents && item.ackIds.length > 0) {
await api.acknowledgeFileEvents(item.ackIds);
if (api?.acknowledgeFileEvents && batch.ackIds.length > 0) {
await api.acknowledgeFileEvents(batch.ackIds);
}
} catch (err) {
console.error("[FolderSync] Failed to trigger re-index:", err);
console.error("[FolderSync] Failed to trigger batch re-index:", err);
}
}
processingRef.current = false;
}
function enqueueWithDebounce(event: FileChangedEvent) {
const key = `${event.folderPath}:${event.relativePath}`;
const existing = pendingByKey.current.get(key);
const ackSet = new Set(existing?.ackIds ?? []);
ackSet.add(event.id);
pendingByKey.current.set(key, {
event,
ackIds: Array.from(ackSet),
});
const folderKey = event.folderPath;
const existing = pendingByFolder.current.get(folderKey);
const existingTimeout = debounceTimers.current.get(key);
if (existing) {
const pathSet = new Set(existing.filePaths);
pathSet.add(event.fullPath);
existing.filePaths = Array.from(pathSet);
if (!existing.ackIds.includes(event.id)) {
existing.ackIds.push(event.id);
}
} else {
pendingByFolder.current.set(folderKey, {
folderPath: event.folderPath,
folderName: event.folderName,
searchSpaceId: event.searchSpaceId,
rootFolderId: event.rootFolderId,
filePaths: [event.fullPath],
ackIds: [event.id],
});
}
const existingTimeout = debounceTimers.current.get(folderKey);
if (existingTimeout) clearTimeout(existingTimeout);
const timeout = setTimeout(() => {
debounceTimers.current.delete(key);
const pending = pendingByKey.current.get(key);
debounceTimers.current.delete(folderKey);
const pending = pendingByFolder.current.get(folderKey);
if (!pending) return;
pendingByKey.current.delete(key);
pendingByFolder.current.delete(folderKey);
queueRef.current.push(pending);
processQueue();
}, DEBOUNCE_MS);
debounceTimers.current.set(key, timeout);
debounceTimers.current.set(folderKey, timeout);
}
useEffect(() => {
@ -108,7 +125,7 @@ export function useFolderSync() {
clearTimeout(timeout);
}
debounceTimers.current.clear();
pendingByKey.current.clear();
pendingByFolder.current.clear();
};
}, []);
}

View file

@ -400,8 +400,8 @@ class DocumentsApiService {
return baseApiService.post(`/api/v1/documents/folder-index`, undefined, { body });
};
folderIndexFile = async (searchSpaceId: number, body: { folder_path: string; folder_name: string; search_space_id: number; target_file_path: string; root_folder_id?: number | null; enable_summary?: boolean }) => {
return baseApiService.post(`/api/v1/documents/folder-index-file`, undefined, { body });
folderIndexFiles = async (searchSpaceId: number, body: { folder_path: string; folder_name: string; search_space_id: number; target_file_paths: string[]; root_folder_id?: number | null; enable_summary?: boolean }) => {
return baseApiService.post(`/api/v1/documents/folder-index-files`, undefined, { body });
};
getWatchedFolders = async (searchSpaceId: number) => {