feat: implement indexing progress management in local folder indexing process and enhance related test coverage

This commit is contained in:
Anish Sarkar 2026-04-08 18:01:55 +05:30
parent a8b83dcf3f
commit 37c52ce7ea
4 changed files with 198 additions and 50 deletions

View file

@ -344,6 +344,27 @@ async def _resolve_folder_for_file(
return current_parent_id
async def _set_indexing_flag(session: AsyncSession, folder_id: int) -> None:
folder = await session.get(Folder, folder_id)
if folder:
meta = dict(folder.folder_metadata or {})
meta["indexing_in_progress"] = True
folder.folder_metadata = meta
await session.commit()
async def _clear_indexing_flag(session: AsyncSession, folder_id: int) -> None:
try:
folder = await session.get(Folder, folder_id)
if folder:
meta = dict(folder.folder_metadata or {})
meta.pop("indexing_in_progress", None)
folder.folder_metadata = meta
await session.commit()
except Exception:
pass
async def _cleanup_empty_folder_chain(
session: AsyncSession,
folder_id: int,
@ -531,44 +552,50 @@ async def index_local_folder(
# BATCH MODE (1..N files)
# ====================================================================
if target_file_paths:
if len(target_file_paths) == 1:
indexed, skipped, err = await _index_single_file(
session=session,
if root_folder_id:
await _set_indexing_flag(session, root_folder_id)
try:
if len(target_file_paths) == 1:
indexed, skipped, err = await _index_single_file(
session=session,
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
target_file_path=target_file_paths[0],
enable_summary=enable_summary,
root_folder_id=root_folder_id,
task_logger=task_logger,
log_entry=log_entry,
)
return indexed, skipped, root_folder_id, err
indexed, failed, err = await _index_batch_files(
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
target_file_path=target_file_paths[0],
target_file_paths=target_file_paths,
enable_summary=enable_summary,
root_folder_id=root_folder_id,
task_logger=task_logger,
log_entry=log_entry,
on_progress_callback=on_heartbeat_callback,
)
return indexed, skipped, root_folder_id, err
indexed, failed, err = await _index_batch_files(
search_space_id=search_space_id,
user_id=user_id,
folder_path=folder_path,
folder_name=folder_name,
target_file_paths=target_file_paths,
enable_summary=enable_summary,
root_folder_id=root_folder_id,
on_progress_callback=on_heartbeat_callback,
)
if err:
await task_logger.log_task_success(
log_entry,
f"Batch indexing: {indexed} indexed, {failed} failed",
{"indexed": indexed, "failed": failed},
)
else:
await task_logger.log_task_success(
log_entry,
f"Batch indexing complete: {indexed} indexed",
{"indexed": indexed, "failed": failed},
)
return indexed, failed, root_folder_id, err
if err:
await task_logger.log_task_success(
log_entry,
f"Batch indexing: {indexed} indexed, {failed} failed",
{"indexed": indexed, "failed": failed},
)
else:
await task_logger.log_task_success(
log_entry,
f"Batch indexing complete: {indexed} indexed",
{"indexed": indexed, "failed": failed},
)
return indexed, failed, root_folder_id, err
finally:
if root_folder_id:
await _clear_indexing_flag(session, root_folder_id)
# ====================================================================
# FULL-SCAN MODE
@ -588,6 +615,7 @@ async def index_local_folder(
exclude_patterns=exclude_patterns,
)
await session.flush()
await _set_indexing_flag(session, root_folder_id)
try:
files = scan_folder(folder_path, file_extensions, exclude_patterns)
@ -595,6 +623,7 @@ async def index_local_folder(
await task_logger.log_task_failure(
log_entry, f"Failed to scan folder: {e}", "Scan error", {}
)
await _clear_indexing_flag(session, root_folder_id)
return 0, 0, root_folder_id, f"Failed to scan folder: {e}"
logger.info(f"Found {len(files)} files in folder")
@ -882,6 +911,7 @@ async def index_local_folder(
},
)
await _clear_indexing_flag(session, root_folder_id)
return indexed_count, skipped_count, root_folder_id, warning_message
except SQLAlchemyError as e:
@ -890,6 +920,8 @@ async def index_local_folder(
await task_logger.log_task_failure(
log_entry, f"DB error: {e}", "Database error", {}
)
if root_folder_id:
await _clear_indexing_flag(session, root_folder_id)
return 0, 0, root_folder_id, f"Database error: {e}"
except Exception as e:
@ -897,6 +929,8 @@ async def index_local_folder(
await task_logger.log_task_failure(
log_entry, f"Error: {e}", "Unexpected error", {}
)
if root_folder_id:
await _clear_indexing_flag(session, root_folder_id)
return 0, 0, root_folder_id, str(e)
@ -1261,12 +1295,7 @@ async def index_uploaded_files(
)
await session.flush()
root_folder = await session.get(Folder, root_folder_id)
if root_folder:
meta = dict(root_folder.folder_metadata or {})
meta["indexing_in_progress"] = True
root_folder.folder_metadata = meta
await session.commit()
await _set_indexing_flag(session, root_folder_id)
page_limit_service = PageLimitService(session)
pipeline = IndexingPipelineService(session)
@ -1443,12 +1472,4 @@ async def index_uploaded_files(
return 0, 0, str(e)
finally:
try:
root_folder = await session.get(Folder, root_folder_id)
if root_folder:
meta = dict(root_folder.folder_metadata or {})
meta.pop("indexing_in_progress", None)
root_folder.folder_metadata = meta
await session.commit()
except Exception:
pass
await _clear_indexing_flag(session, root_folder_id)

View file

@ -1,4 +1,4 @@
"""Integration tests for local folder indexer — Tier 3 (I1-I5), Tier 4 (F1-F7), Tier 5 (P1), Tier 6 (B1-B2)."""
"""Integration tests for local folder indexer — Tier 3 (I1-I5), Tier 4 (F1-F7), Tier 5 (P1), Tier 6 (B1-B2), Tier 7 (IP1-IP3)."""
import os
from contextlib import asynccontextmanager
@ -1178,3 +1178,129 @@ class TestPageLimits:
await db_session.refresh(db_user)
assert db_user.pages_used > 0
assert db_user.pages_used <= db_user.pages_limit + 1
# ====================================================================
# Tier 7: Indexing Progress Flag (IP1-IP3)
# ====================================================================
class TestIndexingProgressFlag:
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
async def test_ip1_full_scan_clears_flag(
self,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""IP1: Full-scan mode clears indexing_in_progress after completion."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
(tmp_path / "note.md").write_text("# Hello\n\nContent.")
_, _, root_folder_id, _ = await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
assert root_folder_id is not None
root_folder = (
await db_session.execute(select(Folder).where(Folder.id == root_folder_id))
).scalar_one()
meta = root_folder.folder_metadata or {}
assert "indexing_in_progress" not in meta
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
async def test_ip2_single_file_clears_flag(
self,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""IP2: Single-file (Chokidar) mode clears indexing_in_progress after completion."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
(tmp_path / "root.md").write_text("root")
_, _, root_folder_id, _ = await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
(tmp_path / "new.md").write_text("new file content")
await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
target_file_paths=[str(tmp_path / "new.md")],
root_folder_id=root_folder_id,
)
root_folder = (
await db_session.execute(select(Folder).where(Folder.id == root_folder_id))
).scalar_one()
meta = root_folder.folder_metadata or {}
assert "indexing_in_progress" not in meta
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
async def test_ip3_flag_set_during_indexing(
self,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""IP3: indexing_in_progress is True on the root folder while indexing is running."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
(tmp_path / "note.md").write_text("# Check flag\n\nDuring indexing.")
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
original_index = IndexingPipelineService.index
flag_observed = []
async def patched_index(self_pipe, document, connector_doc, llm):
folder = (
await db_session.execute(
select(Folder).where(
Folder.search_space_id == db_search_space.id,
Folder.parent_id.is_(None),
)
)
).scalar_one_or_none()
if folder:
meta = folder.folder_metadata or {}
flag_observed.append(meta.get("indexing_in_progress", False))
return await original_index(self_pipe, document, connector_doc, llm)
IndexingPipelineService.index = patched_index
try:
_, _, root_folder_id, _ = await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
finally:
IndexingPipelineService.index = original_index
assert len(flag_observed) > 0, "index() should have been called at least once"
assert all(flag_observed), "indexing_in_progress should be True during indexing"
root_folder = (
await db_session.execute(select(Folder).where(Folder.id == root_folder_id))
).scalar_one()
meta = root_folder.folder_metadata or {}
assert "indexing_in_progress" not in meta

View file

@ -1,7 +1,7 @@
"use client";
import { Slottable } from "@radix-ui/react-slot";
import { type ComponentPropsWithRef, forwardRef, type ReactNode } from "react";
import { type ComponentPropsWithRef, forwardRef, type ReactNode, useState } from "react";
import { Button } from "@/components/ui/button";
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip";
import { useMediaQuery } from "@/hooks/use-media-query";
@ -17,9 +17,10 @@ export const TooltipIconButton = forwardRef<HTMLButtonElement, TooltipIconButton
({ children, tooltip, side = "bottom", className, disableTooltip, ...rest }, ref) => {
const isTouchDevice = useMediaQuery("(pointer: coarse)");
const suppressTooltip = disableTooltip || isTouchDevice;
const [tooltipOpen, setTooltipOpen] = useState(false);
return (
<Tooltip open={suppressTooltip ? false : undefined}>
<Tooltip open={suppressTooltip ? false : tooltipOpen} onOpenChange={suppressTooltip ? undefined : setTooltipOpen}>
<TooltipTrigger asChild>
<Button
variant="ghost"

View file

@ -167,7 +167,7 @@ export function FolderWatchDialog({
<DialogContent className="sm:max-w-md select-none">
<DialogHeader>
<DialogTitle>Watch Local Folder</DialogTitle>
<DialogDescription>Select a folder to sync and watch for changes.</DialogDescription>
<DialogDescription>Select a folder to sync and watch for changes</DialogDescription>
</DialogHeader>
<div className="space-y-3 pt-2 min-h-[13rem]">