feat: implement upload-based folder indexing and synchronization features

This commit is contained in:
Anish Sarkar 2026-04-08 15:46:52 +05:30
parent b3925654dd
commit 5f5954e932
13 changed files with 1273 additions and 45 deletions

View file

@ -1543,3 +1543,379 @@ async def folder_index_files(
"status": "processing",
"file_count": len(request.target_file_paths),
}
# ===== Upload-based local folder indexing endpoints =====
# These work for ALL deployment modes (cloud, self-hosted remote, self-hosted local).
# The desktop app reads files locally and uploads them here.
class FolderMtimeCheckFile(PydanticBaseModel):
relative_path: str
mtime: float
class FolderMtimeCheckRequest(PydanticBaseModel):
folder_name: str
search_space_id: int
files: list[FolderMtimeCheckFile]
class FolderUnlinkRequest(PydanticBaseModel):
folder_name: str
search_space_id: int
root_folder_id: int | None = None
relative_paths: list[str]
class FolderSyncFinalizeRequest(PydanticBaseModel):
folder_name: str
search_space_id: int
root_folder_id: int | None = None
all_relative_paths: list[str]
@router.post("/documents/folder-mtime-check")
async def folder_mtime_check(
request: FolderMtimeCheckRequest,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Pre-upload optimization: check which files need uploading based on mtime.
Returns the subset of relative paths where the file is new or has a
different mtime, so the client can skip reading/uploading unchanged files.
"""
from app.indexing_pipeline.document_hashing import compute_identifier_hash
await check_permission(
session,
user,
request.search_space_id,
Permission.DOCUMENTS_CREATE.value,
"You don't have permission to create documents in this search space",
)
uid_hashes = {}
for f in request.files:
uid = f"{request.folder_name}:{f.relative_path}"
uid_hash = compute_identifier_hash(
DocumentType.LOCAL_FOLDER_FILE.value, uid, request.search_space_id
)
uid_hashes[uid_hash] = f
existing_docs = (
(
await session.execute(
select(Document).where(
Document.unique_identifier_hash.in_(list(uid_hashes.keys())),
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
)
)
)
.scalars()
.all()
)
existing_by_hash = {doc.unique_identifier_hash: doc for doc in existing_docs}
MTIME_TOLERANCE = 1.0
files_to_upload: list[str] = []
for uid_hash, file_info in uid_hashes.items():
doc = existing_by_hash.get(uid_hash)
if doc is None:
files_to_upload.append(file_info.relative_path)
continue
stored_mtime = (doc.document_metadata or {}).get("mtime")
if stored_mtime is None:
files_to_upload.append(file_info.relative_path)
continue
if abs(file_info.mtime - stored_mtime) >= MTIME_TOLERANCE:
files_to_upload.append(file_info.relative_path)
return {"files_to_upload": files_to_upload}
@router.post("/documents/folder-upload")
async def folder_upload(
files: list[UploadFile],
folder_name: str = Form(...),
search_space_id: int = Form(...),
relative_paths: str = Form(...),
root_folder_id: int | None = Form(None),
enable_summary: bool = Form(False),
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Upload files from the desktop app for folder indexing.
Files are written to temp storage and dispatched to a Celery task.
Works for all deployment modes (no is_self_hosted guard).
"""
import json
import tempfile
await check_permission(
session,
user,
search_space_id,
Permission.DOCUMENTS_CREATE.value,
"You don't have permission to create documents in this search space",
)
if not files:
raise HTTPException(status_code=400, detail="No files provided")
try:
rel_paths: list[str] = json.loads(relative_paths)
except (json.JSONDecodeError, TypeError) as e:
raise HTTPException(
status_code=400, detail=f"Invalid relative_paths JSON: {e}"
) from e
if len(rel_paths) != len(files):
raise HTTPException(
status_code=400,
detail=f"Mismatch: {len(files)} files but {len(rel_paths)} relative_paths",
)
for file in files:
file_size = file.size or 0
if file_size > MAX_FILE_SIZE_BYTES:
raise HTTPException(
status_code=413,
detail=f"File '{file.filename}' ({file_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_FILE_SIZE_BYTES // (1024 * 1024)} MB per-file limit.",
)
if not root_folder_id:
watched_metadata = {
"watched": True,
"folder_path": folder_name,
}
existing_root = (
await session.execute(
select(Folder).where(
Folder.name == folder_name,
Folder.parent_id.is_(None),
Folder.search_space_id == search_space_id,
)
)
).scalar_one_or_none()
if existing_root:
root_folder_id = existing_root.id
existing_root.folder_metadata = watched_metadata
else:
root_folder = Folder(
name=folder_name,
search_space_id=search_space_id,
created_by_id=str(user.id),
position="a0",
folder_metadata=watched_metadata,
)
session.add(root_folder)
await session.flush()
root_folder_id = root_folder.id
await session.commit()
async def _read_and_save(file: UploadFile, idx: int) -> dict:
content = await file.read()
filename = file.filename or rel_paths[idx].split("/")[-1]
def _write_temp() -> str:
with tempfile.NamedTemporaryFile(
delete=False, suffix=os.path.splitext(filename)[1]
) as tmp:
tmp.write(content)
return tmp.name
temp_path = await asyncio.to_thread(_write_temp)
return {
"temp_path": temp_path,
"relative_path": rel_paths[idx],
"filename": filename,
}
file_mappings = await asyncio.gather(
*(_read_and_save(f, i) for i, f in enumerate(files))
)
from app.tasks.celery_tasks.document_tasks import (
index_uploaded_folder_files_task,
)
index_uploaded_folder_files_task.delay(
search_space_id=search_space_id,
user_id=str(user.id),
folder_name=folder_name,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
file_mappings=list(file_mappings),
)
return {
"message": f"Folder upload started for {len(files)} file(s)",
"status": "processing",
"root_folder_id": root_folder_id,
"file_count": len(files),
}
@router.post("/documents/folder-unlink")
async def folder_unlink(
request: FolderUnlinkRequest,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Handle file deletion events from the desktop watcher.
For each relative path, find the matching document and delete it.
"""
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.tasks.connector_indexers.local_folder_indexer import (
_cleanup_empty_folder_chain,
)
await check_permission(
session,
user,
request.search_space_id,
Permission.DOCUMENTS_DELETE.value,
"You don't have permission to delete documents in this search space",
)
deleted_count = 0
for rel_path in request.relative_paths:
unique_id = f"{request.folder_name}:{rel_path}"
uid_hash = compute_identifier_hash(
DocumentType.LOCAL_FOLDER_FILE.value,
unique_id,
request.search_space_id,
)
existing = (
await session.execute(
select(Document).where(
Document.unique_identifier_hash == uid_hash
)
)
).scalar_one_or_none()
if existing:
deleted_folder_id = existing.folder_id
await session.delete(existing)
await session.flush()
if deleted_folder_id and request.root_folder_id:
await _cleanup_empty_folder_chain(
session, deleted_folder_id, request.root_folder_id
)
deleted_count += 1
await session.commit()
return {"deleted_count": deleted_count}
@router.post("/documents/folder-sync-finalize")
async def folder_sync_finalize(
request: FolderSyncFinalizeRequest,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""Finalize a full folder scan by deleting orphaned documents.
The client sends the complete list of relative paths currently in the
folder. Any document in the DB for this folder that is NOT in the list
gets deleted.
"""
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.tasks.connector_indexers.local_folder_indexer import (
_cleanup_empty_folders,
)
await check_permission(
session,
user,
request.search_space_id,
Permission.DOCUMENTS_DELETE.value,
"You don't have permission to delete documents in this search space",
)
seen_hashes: set[str] = set()
for rel_path in request.all_relative_paths:
unique_id = f"{request.folder_name}:{rel_path}"
uid_hash = compute_identifier_hash(
DocumentType.LOCAL_FOLDER_FILE.value,
unique_id,
request.search_space_id,
)
seen_hashes.add(uid_hash)
all_root_folder_ids: set[int] = set()
if request.root_folder_id:
all_root_folder_ids.add(request.root_folder_id)
all_db_folders = (
(
await session.execute(
select(Folder.id).where(
Folder.search_space_id == request.search_space_id,
)
)
)
.scalars()
.all()
)
all_root_folder_ids.update(all_db_folders)
all_folder_docs = (
(
await session.execute(
select(Document).where(
Document.document_type == DocumentType.LOCAL_FOLDER_FILE,
Document.search_space_id == request.search_space_id,
Document.folder_id.in_(list(all_root_folder_ids))
if all_root_folder_ids
else True,
)
)
)
.scalars()
.all()
)
deleted_count = 0
for doc in all_folder_docs:
if doc.unique_identifier_hash not in seen_hashes:
await session.delete(doc)
deleted_count += 1
await session.flush()
if request.root_folder_id:
existing_dirs: set[str] = set()
for rel_path in request.all_relative_paths:
parent = str(os.path.dirname(rel_path))
if parent and parent != ".":
existing_dirs.add(parent)
folder_mapping: dict[str, int] = {}
if request.root_folder_id:
folder_mapping[""] = request.root_folder_id
await _cleanup_empty_folders(
session,
request.root_folder_id,
request.search_space_id,
existing_dirs,
folder_mapping,
)
await session.commit()
return {"deleted_count": deleted_count}

View file

@ -11,7 +11,10 @@ from app.config import config
from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.celery_tasks import get_celery_session_maker
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
from app.tasks.connector_indexers.local_folder_indexer import (
index_local_folder,
index_uploaded_files,
)
from app.tasks.document_processors import (
add_extension_received_document,
add_youtube_video_document,
@ -1411,3 +1414,132 @@ async def _index_local_folder_async(
heartbeat_task.cancel()
if notification_id is not None:
_stop_heartbeat(notification_id)
# ===== Upload-based folder indexing task =====
@celery_app.task(name="index_uploaded_folder_files", bind=True)
def index_uploaded_folder_files_task(
self,
search_space_id: int,
user_id: str,
folder_name: str,
root_folder_id: int,
enable_summary: bool,
file_mappings: list[dict],
):
"""Celery task to index files uploaded from the desktop app."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_uploaded_folder_files_async(
search_space_id=search_space_id,
user_id=user_id,
folder_name=folder_name,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
file_mappings=file_mappings,
)
)
finally:
loop.close()
async def _index_uploaded_folder_files_async(
search_space_id: int,
user_id: str,
folder_name: str,
root_folder_id: int,
enable_summary: bool,
file_mappings: list[dict],
):
"""Run upload-based folder indexing with notification + heartbeat."""
file_count = len(file_mappings)
doc_name = f"{folder_name} ({file_count} file{'s' if file_count != 1 else ''})"
notification = None
notification_id: int | None = None
heartbeat_task = None
async with get_celery_session_maker()() as session:
try:
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="LOCAL_FOLDER_FILE",
document_name=doc_name,
search_space_id=search_space_id,
)
)
notification_id = notification.id
_start_heartbeat(notification_id)
heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification_id))
except Exception:
logger.warning(
"Failed to create notification for uploaded folder indexing",
exc_info=True,
)
async def _heartbeat_progress(completed_count: int) -> None:
if notification:
with contextlib.suppress(Exception):
await NotificationService.document_processing.notify_processing_progress(
session=session,
notification=notification,
stage="indexing",
stage_message=f"Syncing files ({completed_count}/{file_count})",
)
try:
_indexed, _failed, err = await index_uploaded_files(
session=session,
search_space_id=search_space_id,
user_id=user_id,
folder_name=folder_name,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
file_mappings=file_mappings,
on_heartbeat_callback=_heartbeat_progress,
)
if notification:
try:
await session.refresh(notification)
if err:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=err,
)
else:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
)
except Exception:
logger.warning(
"Failed to update notification after uploaded folder indexing",
exc_info=True,
)
except Exception as e:
logger.exception(f"Uploaded folder indexing failed: {e}")
if notification:
try:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:200],
)
except Exception:
pass
raise
finally:
if heartbeat_task:
heartbeat_task.cancel()
if notification_id is not None:
_stop_heartbeat(notification_id)

View file

@ -1081,3 +1081,294 @@ async def _index_single_file(
logger.exception(f"Error indexing single file {target_file_path}: {e}")
await session.rollback()
return 0, 0, str(e)
# ========================================================================
# Upload-based folder indexing (works for all deployment modes)
# ========================================================================
async def _mirror_folder_structure_from_paths(
session: AsyncSession,
relative_paths: list[str],
folder_name: str,
search_space_id: int,
user_id: str,
root_folder_id: int | None = None,
) -> tuple[dict[str, int], int]:
"""Create DB Folder rows from a list of relative file paths.
Unlike ``_mirror_folder_structure`` this does not walk the filesystem;
it derives the directory tree from the paths provided by the client.
Returns (mapping, root_folder_id) where mapping is
relative_dir_path -> folder_id. The empty-string key maps to root.
"""
dir_set: set[str] = set()
for rp in relative_paths:
parent = str(Path(rp).parent)
if parent == ".":
continue
parts = Path(parent).parts
for i in range(len(parts)):
dir_set.add(str(Path(*parts[: i + 1])))
subdirs = sorted(dir_set, key=lambda p: p.count(os.sep))
mapping: dict[str, int] = {}
if root_folder_id:
existing = (
await session.execute(select(Folder).where(Folder.id == root_folder_id))
).scalar_one_or_none()
if existing:
mapping[""] = existing.id
else:
root_folder_id = None
if not root_folder_id:
root_folder = Folder(
name=folder_name,
search_space_id=search_space_id,
created_by_id=user_id,
position="a0",
)
session.add(root_folder)
await session.flush()
mapping[""] = root_folder.id
root_folder_id = root_folder.id
for rel_dir in subdirs:
dir_parts = Path(rel_dir).parts
dir_name = dir_parts[-1]
parent_rel = str(Path(*dir_parts[:-1])) if len(dir_parts) > 1 else ""
parent_id = mapping.get(parent_rel, mapping[""])
existing_folder = (
await session.execute(
select(Folder).where(
Folder.name == dir_name,
Folder.parent_id == parent_id,
Folder.search_space_id == search_space_id,
)
)
).scalar_one_or_none()
if existing_folder:
mapping[rel_dir] = existing_folder.id
else:
new_folder = Folder(
name=dir_name,
parent_id=parent_id,
search_space_id=search_space_id,
created_by_id=user_id,
position="a0",
)
session.add(new_folder)
await session.flush()
mapping[rel_dir] = new_folder.id
await session.flush()
return mapping, root_folder_id
UPLOAD_BATCH_CONCURRENCY = 5
async def index_uploaded_files(
session: AsyncSession,
search_space_id: int,
user_id: str,
folder_name: str,
root_folder_id: int,
enable_summary: bool,
file_mappings: list[dict],
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, str | None]:
"""Index files uploaded from the desktop app via temp paths.
Each entry in *file_mappings* is ``{temp_path, relative_path, filename}``.
This function mirrors the folder structure from the provided relative
paths, then indexes each file exactly like ``_index_single_file`` but
reads from the temp path. Temp files are cleaned up after processing.
Returns ``(indexed_count, failed_count, error_summary_or_none)``.
"""
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="local_folder_indexing",
source="uploaded_folder_indexing",
message=f"Indexing {len(file_mappings)} uploaded file(s) for {folder_name}",
metadata={"file_count": len(file_mappings)},
)
try:
all_relative_paths = [m["relative_path"] for m in file_mappings]
folder_mapping, root_folder_id = await _mirror_folder_structure_from_paths(
session=session,
relative_paths=all_relative_paths,
folder_name=folder_name,
search_space_id=search_space_id,
user_id=user_id,
root_folder_id=root_folder_id,
)
await session.flush()
page_limit_service = PageLimitService(session)
pipeline = IndexingPipelineService(session)
llm = await get_user_long_context_llm(session, user_id, search_space_id)
indexed_count = 0
failed_count = 0
errors: list[str] = []
for i, mapping in enumerate(file_mappings):
temp_path = mapping["temp_path"]
relative_path = mapping["relative_path"]
filename = mapping["filename"]
try:
unique_id = f"{folder_name}:{relative_path}"
uid_hash = compute_identifier_hash(
DocumentType.LOCAL_FOLDER_FILE.value,
unique_id,
search_space_id,
)
try:
estimated_pages = await _check_page_limit_or_skip(
page_limit_service, user_id, temp_path
)
except PageLimitExceededError:
logger.warning(f"Page limit exceeded, skipping: {relative_path}")
failed_count += 1
continue
try:
content, content_hash = await _compute_file_content_hash(
temp_path, filename, search_space_id
)
except Exception as e:
logger.warning(f"Could not read {relative_path}: {e}")
failed_count += 1
errors.append(f"{filename}: {e}")
continue
if not content.strip():
failed_count += 1
continue
existing = await check_document_by_unique_identifier(
session, uid_hash
)
if existing:
if existing.content_hash == content_hash:
meta = dict(existing.document_metadata or {})
meta["mtime"] = datetime.now(UTC).timestamp()
existing.document_metadata = meta
if not DocumentStatus.is_state(
existing.status, DocumentStatus.READY
):
existing.status = DocumentStatus.ready()
await session.commit()
continue
await create_version_snapshot(session, existing)
connector_doc = _build_connector_doc(
title=filename,
content=content,
relative_path=relative_path,
folder_name=folder_name,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
)
documents = await pipeline.prepare_for_indexing([connector_doc])
if not documents:
failed_count += 1
continue
db_doc = documents[0]
try:
db_doc.folder_id = await _resolve_folder_for_file(
session,
relative_path,
root_folder_id,
search_space_id,
user_id,
)
await session.commit()
except IntegrityError:
await session.rollback()
await session.refresh(db_doc)
await pipeline.index(db_doc, connector_doc, llm)
await session.refresh(db_doc)
doc_meta = dict(db_doc.document_metadata or {})
doc_meta["mtime"] = datetime.now(UTC).timestamp()
db_doc.document_metadata = doc_meta
await session.commit()
if DocumentStatus.is_state(db_doc.status, DocumentStatus.READY):
indexed_count += 1
final_pages = _compute_final_pages(
page_limit_service, estimated_pages, len(content)
)
await page_limit_service.update_page_usage(
user_id, final_pages, allow_exceed=True
)
else:
failed_count += 1
if on_heartbeat_callback and (i + 1) % 5 == 0:
await on_heartbeat_callback(i + 1)
except Exception as e:
logger.exception(
f"Error indexing uploaded file {relative_path}: {e}"
)
await session.rollback()
failed_count += 1
errors.append(f"{filename}: {e}")
finally:
try:
os.unlink(temp_path)
except OSError:
pass
error_summary = None
if errors:
error_summary = (
f"{failed_count} file(s) failed: " + "; ".join(errors[:5])
)
if len(errors) > 5:
error_summary += f" ... and {len(errors) - 5} more"
await task_logger.log_task_success(
log_entry,
f"Upload indexing complete: {indexed_count} indexed, {failed_count} failed",
{"indexed": indexed_count, "failed": failed_count},
)
return indexed_count, failed_count, error_summary
except SQLAlchemyError as e:
logger.exception(f"Database error during uploaded file indexing: {e}")
await session.rollback()
await task_logger.log_task_failure(
log_entry, f"DB error: {e}", "Database error", {}
)
return 0, 0, f"Database error: {e}"
except Exception as e:
logger.exception(f"Error during uploaded file indexing: {e}")
await task_logger.log_task_failure(
log_entry, f"Error: {e}", "Unexpected error", {}
)
return 0, 0, str(e)

View file

@ -30,6 +30,7 @@ export const IPC_CHANNELS = {
FOLDER_SYNC_RENDERER_READY: 'folder-sync:renderer-ready',
FOLDER_SYNC_GET_PENDING_EVENTS: 'folder-sync:get-pending-events',
FOLDER_SYNC_ACK_EVENTS: 'folder-sync:ack-events',
FOLDER_SYNC_LIST_FILES: 'folder-sync:list-files',
BROWSE_FILES: 'browse:files',
READ_LOCAL_FILES: 'browse:read-local-files',
// Auth token sync across windows

View file

@ -19,6 +19,8 @@ import {
markRendererReady,
browseFiles,
readLocalFiles,
listFolderFiles,
type WatchedFolderConfig,
} from '../modules/folder-watcher';
import { getShortcuts, setShortcuts, type ShortcutConfig } from '../modules/shortcuts';
import { getActiveSearchSpaceId, setActiveSearchSpaceId } from '../modules/active-search-space';
@ -91,6 +93,10 @@ export function registerIpcHandlers(): void {
acknowledgeFileEvents(eventIds)
);
ipcMain.handle(IPC_CHANNELS.FOLDER_SYNC_LIST_FILES, (_event, config: WatchedFolderConfig) =>
listFolderFiles(config)
);
ipcMain.handle(IPC_CHANNELS.BROWSE_FILES, () => browseFiles());
ipcMain.handle(IPC_CHANNELS.READ_LOCAL_FILES, (_event, paths: string[]) =>

View file

@ -188,6 +188,31 @@ function walkFolderMtimes(config: WatchedFolderConfig): MtimeMap {
return result;
}
export interface FolderFileEntry {
relativePath: string;
fullPath: string;
size: number;
mtimeMs: number;
}
export function listFolderFiles(config: WatchedFolderConfig): FolderFileEntry[] {
const root = config.path;
const mtimeMap = walkFolderMtimes(config);
const entries: FolderFileEntry[] = [];
for (const [relativePath, mtimeMs] of Object.entries(mtimeMap)) {
const fullPath = path.join(root, relativePath);
try {
const stat = fs.statSync(fullPath);
entries.push({ relativePath, fullPath, size: stat.size, mtimeMs });
} catch {
// File may have been removed between walk and stat
}
}
return entries;
}
function getMainWindow(): BrowserWindow | null {
const windows = BrowserWindow.getAllWindows();
return windows.length > 0 ? windows[0] : null;

View file

@ -64,6 +64,7 @@ contextBridge.exposeInMainWorld('electronAPI', {
signalRendererReady: () => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_RENDERER_READY),
getPendingFileEvents: () => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_GET_PENDING_EVENTS),
acknowledgeFileEvents: (eventIds: string[]) => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_ACK_EVENTS, eventIds),
listFolderFiles: (config: any) => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_LIST_FILES, config),
// Browse files via native dialog
browseFiles: () => ipcRenderer.invoke(IPC_CHANNELS.BROWSE_FILES),

View file

@ -23,7 +23,9 @@ import { FolderPickerDialog } from "@/components/documents/FolderPickerDialog";
import { FolderTreeView } from "@/components/documents/FolderTreeView";
import { VersionHistoryDialog } from "@/components/documents/version-history";
import { EXPORT_FILE_EXTENSIONS } from "@/components/shared/ExportMenuItems";
import { FolderWatchDialog, type SelectedFolder } from "@/components/sources/FolderWatchDialog";
import { DEFAULT_EXCLUDE_PATTERNS, FolderWatchDialog, type SelectedFolder } from "@/components/sources/FolderWatchDialog";
import { uploadFolderScan } from "@/lib/folder-sync-upload";
import { getSupportedExtensionsSet } from "@/lib/supported-extensions";
import {
AlertDialog,
AlertDialogAction,
@ -304,14 +306,17 @@ export function DocumentsSidebar({
}
try {
await documentsApiService.folderIndex(searchSpaceId, {
folder_path: matched.path,
folder_name: matched.name,
search_space_id: searchSpaceId,
root_folder_id: folder.id,
file_extensions: matched.fileExtensions ?? undefined,
toast.info(`Re-scanning folder: ${matched.name}`);
await uploadFolderScan({
folderPath: matched.path,
folderName: matched.name,
searchSpaceId,
excludePatterns: matched.excludePatterns ?? DEFAULT_EXCLUDE_PATTERNS,
fileExtensions: matched.fileExtensions ?? Array.from(getSupportedExtensionsSet()),
enableSummary: false,
rootFolderId: folder.id,
});
toast.success(`Re-scanning folder: ${matched.name}`);
toast.success(`Re-scan complete: ${matched.name}`);
} catch (err) {
toast.error((err as Error)?.message || "Failed to re-scan folder");
}

View file

@ -1,7 +1,7 @@
"use client";
import { X } from "lucide-react";
import { useCallback, useEffect, useMemo, useState } from "react";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { toast } from "sonner";
import { Button } from "@/components/ui/button";
import {
@ -13,8 +13,8 @@ import {
} from "@/components/ui/dialog";
import { Spinner } from "@/components/ui/spinner";
import { Switch } from "@/components/ui/switch";
import { documentsApiService } from "@/lib/apis/documents-api.service";
import { getSupportedExtensionsSet } from "@/lib/supported-extensions";
import { type FolderSyncProgress, uploadFolderScan } from "@/lib/folder-sync-upload";
export interface SelectedFolder {
path: string;
@ -29,7 +29,7 @@ interface FolderWatchDialogProps {
initialFolder?: SelectedFolder | null;
}
const DEFAULT_EXCLUDE_PATTERNS = [
export const DEFAULT_EXCLUDE_PATTERNS = [
".git",
"node_modules",
"__pycache__",
@ -48,6 +48,8 @@ export function FolderWatchDialog({
const [selectedFolder, setSelectedFolder] = useState<SelectedFolder | null>(null);
const [shouldSummarize, setShouldSummarize] = useState(false);
const [submitting, setSubmitting] = useState(false);
const [progress, setProgress] = useState<FolderSyncProgress | null>(null);
const abortRef = useRef<AbortController | null>(null);
useEffect(() => {
if (open && initialFolder) {
@ -68,29 +70,38 @@ export function FolderWatchDialog({
setSelectedFolder({ path: folderPath, name: folderName });
}, []);
const handleCancel = useCallback(() => {
abortRef.current?.abort();
}, []);
const handleSubmit = useCallback(async () => {
if (!selectedFolder) return;
const api = window.electronAPI;
if (!api) return;
const controller = new AbortController();
abortRef.current = controller;
setSubmitting(true);
try {
const result = await documentsApiService.folderIndex(searchSpaceId, {
folder_path: selectedFolder.path,
folder_name: selectedFolder.name,
search_space_id: searchSpaceId,
enable_summary: shouldSummarize,
file_extensions: supportedExtensions,
});
setProgress(null);
const rootFolderId = (result as { root_folder_id?: number })?.root_folder_id ?? null;
try {
const rootFolderId = await uploadFolderScan({
folderPath: selectedFolder.path,
folderName: selectedFolder.name,
searchSpaceId,
excludePatterns: DEFAULT_EXCLUDE_PATTERNS,
fileExtensions: supportedExtensions,
enableSummary: shouldSummarize,
onProgress: setProgress,
signal: controller.signal,
});
await api.addWatchedFolder({
path: selectedFolder.path,
name: selectedFolder.name,
excludePatterns: DEFAULT_EXCLUDE_PATTERNS,
fileExtensions: supportedExtensions,
rootFolderId,
rootFolderId: rootFolderId ?? null,
searchSpaceId,
active: true,
});
@ -98,12 +109,19 @@ export function FolderWatchDialog({
toast.success(`Watching folder: ${selectedFolder.name}`);
setSelectedFolder(null);
setShouldSummarize(false);
setProgress(null);
onOpenChange(false);
onSuccess?.();
} catch (err) {
toast.error((err as Error)?.message || "Failed to watch folder");
if ((err as Error)?.name === "AbortError") {
toast.info("Folder sync cancelled. Partial progress was saved.");
} else {
toast.error((err as Error)?.message || "Failed to watch folder");
}
} finally {
abortRef.current = null;
setSubmitting(false);
setProgress(null);
}
}, [
selectedFolder,
@ -119,12 +137,31 @@ export function FolderWatchDialog({
if (!nextOpen && !submitting) {
setSelectedFolder(null);
setShouldSummarize(false);
setProgress(null);
}
onOpenChange(nextOpen);
},
[onOpenChange, submitting]
);
const progressLabel = useMemo(() => {
if (!progress) return null;
switch (progress.phase) {
case "listing":
return "Scanning folder...";
case "checking":
return `Checking ${progress.total} file(s)...`;
case "uploading":
return `Uploading ${progress.uploaded}/${progress.total} file(s)...`;
case "finalizing":
return "Finalizing...";
case "done":
return "Done!";
default:
return null;
}
}, [progress]);
return (
<Dialog open={open} onOpenChange={handleOpenChange}>
<DialogContent className="sm:max-w-md select-none">
@ -174,14 +211,39 @@ export function FolderWatchDialog({
<Switch checked={shouldSummarize} onCheckedChange={setShouldSummarize} />
</div>
<Button className="w-full relative" onClick={handleSubmit} disabled={submitting}>
<span className={submitting ? "invisible" : ""}>Start Folder Sync</span>
{submitting && (
<span className="absolute inset-0 flex items-center justify-center">
<Spinner size="sm" />
</span>
{progressLabel && (
<div className="rounded-lg bg-slate-400/5 dark:bg-white/5 px-3 py-2">
<p className="text-xs text-muted-foreground">{progressLabel}</p>
{progress && progress.phase === "uploading" && progress.total > 0 && (
<div className="mt-1.5 h-1.5 w-full rounded-full bg-muted overflow-hidden">
<div
className="h-full bg-primary rounded-full transition-[width] duration-300"
style={{ width: `${Math.round((progress.uploaded / progress.total) * 100)}%` }}
/>
</div>
)}
</div>
)}
<div className="flex gap-2">
{submitting ? (
<>
<Button variant="outline" className="flex-1" onClick={handleCancel}>
Cancel
</Button>
<Button className="flex-1 relative" disabled>
<span className="invisible">Syncing...</span>
<span className="absolute inset-0 flex items-center justify-center">
<Spinner size="sm" />
</span>
</Button>
</>
) : (
<Button className="w-full" onClick={handleSubmit}>
Start Folder Sync
</Button>
)}
</Button>
</div>
</>
)}
</div>

View file

@ -20,12 +20,18 @@ const DEBOUNCE_MS = 2000;
const MAX_WAIT_MS = 10_000;
const MAX_BATCH_SIZE = 50;
interface FileEntry {
fullPath: string;
relativePath: string;
action: string;
}
interface BatchItem {
folderPath: string;
folderName: string;
searchSpaceId: number;
rootFolderId: number | null;
filePaths: string[];
files: FileEntry[];
ackIds: string[];
}
@ -44,18 +50,40 @@ export function useFolderSync() {
while (queueRef.current.length > 0) {
const batch = queueRef.current.shift()!;
try {
await documentsApiService.folderIndexFiles(batch.searchSpaceId, {
folder_path: batch.folderPath,
folder_name: batch.folderName,
search_space_id: batch.searchSpaceId,
target_file_paths: batch.filePaths,
root_folder_id: batch.rootFolderId,
});
const addChangeFiles = batch.files.filter((f) => f.action === "add" || f.action === "change");
const unlinkFiles = batch.files.filter((f) => f.action === "unlink");
if (addChangeFiles.length > 0 && electronAPI?.readLocalFiles) {
const fullPaths = addChangeFiles.map((f) => f.fullPath);
const fileDataArr = await electronAPI.readLocalFiles(fullPaths);
const files: File[] = fileDataArr.map((fd) => {
const blob = new Blob([fd.data], { type: fd.mimeType || "application/octet-stream" });
return new File([blob], fd.name, { type: blob.type });
});
await documentsApiService.folderUploadFiles(files, {
folder_name: batch.folderName,
search_space_id: batch.searchSpaceId,
relative_paths: addChangeFiles.map((f) => f.relativePath),
root_folder_id: batch.rootFolderId,
});
}
if (unlinkFiles.length > 0) {
await documentsApiService.folderNotifyUnlinked({
folder_name: batch.folderName,
search_space_id: batch.searchSpaceId,
root_folder_id: batch.rootFolderId,
relative_paths: unlinkFiles.map((f) => f.relativePath),
});
}
if (electronAPI?.acknowledgeFileEvents && batch.ackIds.length > 0) {
await electronAPI.acknowledgeFileEvents(batch.ackIds);
}
} catch (err) {
console.error("[FolderSync] Failed to trigger batch re-index:", err);
console.error("[FolderSync] Failed to process batch:", err);
}
}
processingRef.current = false;
@ -68,10 +96,10 @@ export function useFolderSync() {
if (!pending) return;
pendingByFolder.current.delete(folderKey);
for (let i = 0; i < pending.filePaths.length; i += MAX_BATCH_SIZE) {
for (let i = 0; i < pending.files.length; i += MAX_BATCH_SIZE) {
queueRef.current.push({
...pending,
filePaths: pending.filePaths.slice(i, i + MAX_BATCH_SIZE),
files: pending.files.slice(i, i + MAX_BATCH_SIZE),
ackIds: i === 0 ? pending.ackIds : [],
});
}
@ -83,9 +111,14 @@ export function useFolderSync() {
const existing = pendingByFolder.current.get(folderKey);
if (existing) {
const pathSet = new Set(existing.filePaths);
pathSet.add(event.fullPath);
existing.filePaths = Array.from(pathSet);
const pathSet = new Set(existing.files.map((f) => f.fullPath));
if (!pathSet.has(event.fullPath)) {
existing.files.push({
fullPath: event.fullPath,
relativePath: event.relativePath,
action: event.action,
});
}
if (!existing.ackIds.includes(event.id)) {
existing.ackIds.push(event.id);
}
@ -95,7 +128,11 @@ export function useFolderSync() {
folderName: event.folderName,
searchSpaceId: event.searchSpaceId,
rootFolderId: event.rootFolderId,
filePaths: [event.fullPath],
files: [{
fullPath: event.fullPath,
relativePath: event.relativePath,
action: event.action,
}],
ackIds: [event.id],
});
firstEventTime.current.set(folderKey, Date.now());

View file

@ -453,6 +453,76 @@ class DocumentsApiService {
return baseApiService.post(`/api/v1/documents/folder-index-files`, undefined, { body });
};
folderMtimeCheck = async (body: {
folder_name: string;
search_space_id: number;
files: { relative_path: string; mtime: number }[];
}): Promise<{ files_to_upload: string[] }> => {
return baseApiService.post(`/api/v1/documents/folder-mtime-check`, undefined, { body }) as unknown as { files_to_upload: string[] };
};
folderUploadFiles = async (
files: File[],
metadata: {
folder_name: string;
search_space_id: number;
relative_paths: string[];
root_folder_id?: number | null;
enable_summary?: boolean;
},
signal?: AbortSignal,
): Promise<{ message: string; status: string; root_folder_id: number; file_count: number }> => {
const formData = new FormData();
for (const file of files) {
formData.append("files", file);
}
formData.append("folder_name", metadata.folder_name);
formData.append("search_space_id", String(metadata.search_space_id));
formData.append("relative_paths", JSON.stringify(metadata.relative_paths));
if (metadata.root_folder_id != null) {
formData.append("root_folder_id", String(metadata.root_folder_id));
}
formData.append("enable_summary", String(metadata.enable_summary ?? false));
const totalSize = files.reduce((acc, f) => acc + f.size, 0);
const timeoutMs = Math.min(Math.max((totalSize / (1024 * 1024)) * 5000, 30_000), 600_000);
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
if (signal) {
signal.addEventListener("abort", () => controller.abort(), { once: true });
}
try {
return await baseApiService.postFormData(
`/api/v1/documents/folder-upload`,
undefined,
{ body: formData, signal: controller.signal },
) as { message: string; status: string; root_folder_id: number; file_count: number };
} finally {
clearTimeout(timeoutId);
}
};
folderNotifyUnlinked = async (body: {
folder_name: string;
search_space_id: number;
root_folder_id: number | null;
relative_paths: string[];
}): Promise<{ deleted_count: number }> => {
return baseApiService.post(`/api/v1/documents/folder-unlink`, undefined, { body }) as unknown as { deleted_count: number };
};
folderSyncFinalize = async (body: {
folder_name: string;
search_space_id: number;
root_folder_id: number | null;
all_relative_paths: string[];
}): Promise<{ deleted_count: number }> => {
return baseApiService.post(`/api/v1/documents/folder-sync-finalize`, undefined, { body }) as unknown as { deleted_count: number };
};
getWatchedFolders = async (searchSpaceId: number) => {
return baseApiService.get(
`/api/v1/documents/watched-folders?search_space_id=${searchSpaceId}`,

View file

@ -0,0 +1,214 @@
import { documentsApiService } from "@/lib/apis/documents-api.service";
const MAX_BATCH_SIZE_BYTES = 20 * 1024 * 1024; // 20 MB
const MAX_BATCH_FILES = 10;
const UPLOAD_CONCURRENCY = 3;
export interface FolderSyncProgress {
phase: "listing" | "checking" | "uploading" | "finalizing" | "done";
uploaded: number;
total: number;
}
export interface FolderSyncParams {
folderPath: string;
folderName: string;
searchSpaceId: number;
excludePatterns: string[];
fileExtensions: string[];
enableSummary: boolean;
rootFolderId?: number | null;
onProgress?: (progress: FolderSyncProgress) => void;
signal?: AbortSignal;
}
function buildBatches(
entries: FolderFileEntry[],
): FolderFileEntry[][] {
const batches: FolderFileEntry[][] = [];
let currentBatch: FolderFileEntry[] = [];
let currentSize = 0;
for (const entry of entries) {
if (entry.size >= MAX_BATCH_SIZE_BYTES) {
if (currentBatch.length > 0) {
batches.push(currentBatch);
currentBatch = [];
currentSize = 0;
}
batches.push([entry]);
continue;
}
if (
currentBatch.length >= MAX_BATCH_FILES ||
currentSize + entry.size > MAX_BATCH_SIZE_BYTES
) {
batches.push(currentBatch);
currentBatch = [];
currentSize = 0;
}
currentBatch.push(entry);
currentSize += entry.size;
}
if (currentBatch.length > 0) {
batches.push(currentBatch);
}
return batches;
}
async function uploadBatchesWithConcurrency(
batches: FolderFileEntry[][],
params: {
folderName: string;
searchSpaceId: number;
rootFolderId: number | null;
enableSummary: boolean;
signal?: AbortSignal;
onBatchComplete?: (filesInBatch: number) => void;
},
) {
const api = window.electronAPI;
if (!api) throw new Error("Electron API not available");
let batchIdx = 0;
const errors: string[] = [];
async function processNext(): Promise<void> {
while (true) {
if (params.signal?.aborted) return;
const idx = batchIdx++;
if (idx >= batches.length) return;
const batch = batches[idx];
const fullPaths = batch.map((e) => e.fullPath);
try {
const fileDataArr = await api.readLocalFiles(fullPaths);
const files: File[] = fileDataArr.map((fd) => {
const blob = new Blob([fd.data], { type: fd.mimeType || "application/octet-stream" });
return new File([blob], fd.name, { type: blob.type });
});
await documentsApiService.folderUploadFiles(
files,
{
folder_name: params.folderName,
search_space_id: params.searchSpaceId,
relative_paths: batch.map((e) => e.relativePath),
root_folder_id: params.rootFolderId,
enable_summary: params.enableSummary,
},
params.signal,
);
params.onBatchComplete?.(batch.length);
} catch (err) {
if (params.signal?.aborted) return;
const msg = (err as Error)?.message || "Upload failed";
errors.push(`Batch ${idx}: ${msg}`);
}
}
}
const workers = Array.from({ length: Math.min(UPLOAD_CONCURRENCY, batches.length) }, () => processNext());
await Promise.all(workers);
if (errors.length > 0 && !params.signal?.aborted) {
console.error("Some batches failed:", errors);
}
}
/**
* Run a full upload-based folder scan: list files, mtime-check, upload
* changed files in parallel batches, and finalize (delete orphans).
*
* Returns the root_folder_id to pass to addWatchedFolder.
*/
export async function uploadFolderScan(params: FolderSyncParams): Promise<number | null> {
const api = window.electronAPI;
if (!api) throw new Error("Electron API not available");
const { folderPath, folderName, searchSpaceId, excludePatterns, fileExtensions, enableSummary, signal } = params;
let rootFolderId = params.rootFolderId ?? null;
params.onProgress?.({ phase: "listing", uploaded: 0, total: 0 });
if (signal?.aborted) throw new DOMException("Aborted", "AbortError");
const allFiles = await api.listFolderFiles({
path: folderPath,
name: folderName,
excludePatterns,
fileExtensions,
rootFolderId: rootFolderId ?? null,
searchSpaceId,
active: true,
});
if (signal?.aborted) throw new DOMException("Aborted", "AbortError");
params.onProgress?.({ phase: "checking", uploaded: 0, total: allFiles.length });
const mtimeCheckResult = await documentsApiService.folderMtimeCheck({
folder_name: folderName,
search_space_id: searchSpaceId,
files: allFiles.map((f) => ({ relative_path: f.relativePath, mtime: f.mtimeMs / 1000 })),
});
const filesToUpload = mtimeCheckResult.files_to_upload;
const uploadSet = new Set(filesToUpload);
const entriesToUpload = allFiles.filter((f) => uploadSet.has(f.relativePath));
if (signal?.aborted) throw new DOMException("Aborted", "AbortError");
if (entriesToUpload.length > 0) {
const batches = buildBatches(entriesToUpload);
let uploaded = 0;
params.onProgress?.({ phase: "uploading", uploaded: 0, total: entriesToUpload.length });
await uploadBatchesWithConcurrency(batches, {
folderName,
searchSpaceId,
rootFolderId: rootFolderId ?? null,
enableSummary,
signal,
onBatchComplete: (count) => {
uploaded += count;
params.onProgress?.({ phase: "uploading", uploaded, total: entriesToUpload.length });
},
});
if (signal?.aborted) throw new DOMException("Aborted", "AbortError");
if (!rootFolderId) {
const watchedFolders = await documentsApiService.getWatchedFolders(searchSpaceId);
const folderList = watchedFolders as Array<{ id: number; name: string }> | undefined;
const matched = folderList?.find((f) => f.name === folderName);
if (matched?.id) {
rootFolderId = matched.id;
}
}
}
if (signal?.aborted) throw new DOMException("Aborted", "AbortError");
params.onProgress?.({ phase: "finalizing", uploaded: entriesToUpload.length, total: entriesToUpload.length });
await documentsApiService.folderSyncFinalize({
folder_name: folderName,
search_space_id: searchSpaceId,
root_folder_id: rootFolderId ?? null,
all_relative_paths: allFiles.map((f) => f.relativePath),
});
params.onProgress?.({ phase: "done", uploaded: entriesToUpload.length, total: entriesToUpload.length });
return rootFolderId;
}

View file

@ -34,6 +34,13 @@ interface LocalFileData {
size: number;
}
interface FolderFileEntry {
relativePath: string;
fullPath: string;
size: number;
mtimeMs: number;
}
interface ElectronAPI {
versions: {
electron: string;
@ -82,6 +89,7 @@ interface ElectronAPI {
signalRendererReady: () => Promise<void>;
getPendingFileEvents: () => Promise<FolderSyncFileChangedEvent[]>;
acknowledgeFileEvents: (eventIds: string[]) => Promise<{ acknowledged: number }>;
listFolderFiles: (config: WatchedFolderConfig) => Promise<FolderFileEntry[]>;
// Browse files/folders via native dialogs
browseFiles: () => Promise<string[] | null>;
readLocalFiles: (paths: string[]) => Promise<LocalFileData[]>;