feat: implement page limit checks in local folder indexing to manage user page usage

This commit is contained in:
Anish Sarkar 2026-04-03 19:13:25 +05:30
parent 5068a6b4f3
commit 9c0af6569d
2 changed files with 303 additions and 4 deletions

View file

@ -34,6 +34,7 @@ from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.page_limit_service import PageLimitExceededError, PageLimitService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.celery_tasks import get_celery_session_maker
from app.utils.document_versioning import create_version_snapshot
@ -171,6 +172,39 @@ def _needs_etl(filename: str) -> bool:
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
def _estimate_pages_safe(page_limit_service: PageLimitService, file_path: str) -> int:
"""Estimate page count with a file-size fallback."""
try:
return page_limit_service.estimate_pages_before_processing(file_path)
except Exception:
file_size = os.path.getsize(file_path)
return max(1, file_size // (80 * 1024))
async def _check_page_limit_or_skip(
page_limit_service: PageLimitService,
user_id: str,
file_path: str,
) -> int:
"""Estimate pages and check the limit; raises PageLimitExceededError if over quota.
Returns the estimated page count on success.
"""
estimated = _estimate_pages_safe(page_limit_service, file_path)
await page_limit_service.check_page_limit(user_id, estimated)
return estimated
def _compute_final_pages(
page_limit_service: PageLimitService,
estimated_pages: int,
content_length: int,
) -> int:
"""Return the final page count as max(estimated, actual)."""
actual = page_limit_service.estimate_pages_from_content_length(content_length)
return max(estimated_pages, actual)
DEFAULT_EXCLUDE_PATTERNS = [
".git",
"node_modules",
@ -720,11 +754,12 @@ async def index_local_folder(
skipped_count = 0
failed_count = 0
page_limit_service = PageLimitService(session)
# ================================================================
# PHASE 1: Pre-filter files (mtime / content-hash), version changed
# ================================================================
connector_docs: list[ConnectorDocument] = []
# Maps unique_id -> (relative_path, mtime) for post-pipeline folder_id assignment
file_meta_map: dict[str, dict] = {}
seen_unique_hashes: set[str] = set()
@ -759,6 +794,17 @@ async def index_local_folder(
skipped_count += 1
continue
try:
estimated_pages = await _check_page_limit_or_skip(
page_limit_service, user_id, file_path_abs
)
except PageLimitExceededError:
logger.warning(
f"Page limit exceeded, skipping: {file_path_abs}"
)
failed_count += 1
continue
try:
content, content_hash = await _compute_file_content_hash(
file_path_abs, file_info["relative_path"], search_space_id
@ -781,6 +827,17 @@ async def index_local_folder(
await create_version_snapshot(session, existing_document)
else:
try:
estimated_pages = await _check_page_limit_or_skip(
page_limit_service, user_id, file_path_abs
)
except PageLimitExceededError:
logger.warning(
f"Page limit exceeded, skipping: {file_path_abs}"
)
failed_count += 1
continue
try:
content, content_hash = await _compute_file_content_hash(
file_path_abs, file_info["relative_path"], search_space_id
@ -807,6 +864,8 @@ async def index_local_folder(
file_meta_map[unique_identifier] = {
"relative_path": relative_path,
"mtime": file_info["modified_at"].timestamp(),
"estimated_pages": estimated_pages,
"content_length": len(content),
}
except Exception as e:
@ -901,6 +960,15 @@ async def index_local_folder(
doc_meta = dict(result.document_metadata or {})
doc_meta["mtime"] = mtime_info.get("mtime")
result.document_metadata = doc_meta
est = mtime_info.get("estimated_pages", 1)
content_len = mtime_info.get("content_length", 0)
final_pages = _compute_final_pages(
page_limit_service, est, content_len
)
await page_limit_service.update_page_usage(
user_id, final_pages, allow_exceed=True
)
else:
failed_count += 1
@ -1084,6 +1152,14 @@ async def _index_single_file(
DocumentType.LOCAL_FOLDER_FILE.value, unique_id, search_space_id
)
page_limit_service = PageLimitService(session)
try:
estimated_pages = await _check_page_limit_or_skip(
page_limit_service, user_id, str(full_path)
)
except PageLimitExceededError as e:
return 0, 1, f"Page limit exceeded: {e}"
try:
content, content_hash = await _compute_file_content_hash(
str(full_path), full_path.name, search_space_id
@ -1128,8 +1204,6 @@ async def _index_single_file(
db_doc = documents[0]
# Assign folder_id before indexing so the doc appears in the
# correct folder while still pending/processing.
if root_folder_id:
try:
db_doc.folder_id = await _resolve_folder_for_file(
@ -1154,10 +1228,16 @@ async def _index_single_file(
failed_msg = None if indexed else "Indexing failed"
if indexed:
final_pages = _compute_final_pages(
page_limit_service, estimated_pages, len(content)
)
await page_limit_service.update_page_usage(
user_id, final_pages, allow_exceed=True
)
await task_logger.log_task_success(
log_entry,
f"Single file indexed: {rel_path}",
{"file": rel_path},
{"file": rel_path, "pages_processed": final_pages},
)
return indexed, 0 if indexed else 1, failed_msg

View file

@ -959,3 +959,222 @@ class TestDirectConvert:
assert "| name" in doc.source_markdown
assert "name,age,city" not in doc.source_markdown
# ====================================================================
# Tier 8: Page Limits (PL1-PL6)
# ====================================================================
class TestPageLimits:
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
async def test_pl1_full_scan_increments_pages_used(
self,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""PL1: Successful full-scan sync increments user.pages_used."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
db_user.pages_used = 0
db_user.pages_limit = 500
await db_session.flush()
(tmp_path / "note.md").write_text("# Hello World\n\nContent here.")
count, _skipped, _root_folder_id, err = await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
assert err is None
assert count == 1
await db_session.refresh(db_user)
assert db_user.pages_used > 0, "pages_used should increase after indexing"
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
async def test_pl2_full_scan_blocked_when_limit_exhausted(
self,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""PL2: Full-scan skips file when page limit is exhausted."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
db_user.pages_used = 100
db_user.pages_limit = 100
await db_session.flush()
(tmp_path / "note.md").write_text("# Hello World\n\nContent here.")
count, _skipped, _root_folder_id, err = await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
assert count == 0
await db_session.refresh(db_user)
assert db_user.pages_used == 100, "pages_used should not change on rejection"
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
async def test_pl3_single_file_increments_pages_used(
self,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""PL3: Single-file mode increments user.pages_used on success."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
db_user.pages_used = 0
db_user.pages_limit = 500
await db_session.flush()
(tmp_path / "note.md").write_text("# Hello World\n\nContent here.")
count, _skipped, _root_folder_id, err = await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
target_file_paths=[str(tmp_path / "note.md")],
)
assert err is None
assert count == 1
await db_session.refresh(db_user)
assert db_user.pages_used > 0, "pages_used should increase after indexing"
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
async def test_pl4_single_file_blocked_when_limit_exhausted(
self,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""PL4: Single-file mode skips file when page limit is exhausted."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
db_user.pages_used = 100
db_user.pages_limit = 100
await db_session.flush()
(tmp_path / "note.md").write_text("# Hello World\n\nContent here.")
count, _skipped, _root_folder_id, err = await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
target_file_paths=[str(tmp_path / "note.md")],
)
assert count == 0
assert err is not None
assert "page limit" in err.lower()
await db_session.refresh(db_user)
assert db_user.pages_used == 100, "pages_used should not change on rejection"
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
async def test_pl5_unchanged_resync_no_extra_pages(
self,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
):
"""PL5: Re-syncing an unchanged file does not consume additional pages."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
db_user.pages_used = 0
db_user.pages_limit = 500
await db_session.flush()
(tmp_path / "note.md").write_text("# Hello\n\nSame content.")
count1, _, root_folder_id, _ = await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
)
assert count1 == 1
await db_session.refresh(db_user)
pages_after_first = db_user.pages_used
assert pages_after_first > 0
count2, _, _, _ = await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
root_folder_id=root_folder_id,
)
assert count2 == 0
await db_session.refresh(db_user)
assert db_user.pages_used == pages_after_first, (
"pages_used should not increase for unchanged files"
)
@pytest.mark.usefixtures(*UNIFIED_FIXTURES)
async def test_pl6_batch_partial_page_limit_exhaustion(
self,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
tmp_path: Path,
patched_batch_sessions,
):
"""PL6: Batch mode with a very low page limit: some files succeed, rest fail."""
from app.tasks.connector_indexers.local_folder_indexer import index_local_folder
db_user.pages_used = 0
db_user.pages_limit = 1
await db_session.flush()
(tmp_path / "a.md").write_text("File A content")
(tmp_path / "b.md").write_text("File B content")
(tmp_path / "c.md").write_text("File C content")
count, failed, _root_folder_id, _err = await index_local_folder(
session=db_session,
search_space_id=db_search_space.id,
user_id=str(db_user.id),
folder_path=str(tmp_path),
folder_name="test-folder",
target_file_paths=[
str(tmp_path / "a.md"),
str(tmp_path / "b.md"),
str(tmp_path / "c.md"),
],
)
assert count >= 1, "at least one file should succeed"
assert failed >= 1, "at least one file should fail due to page limit"
assert count + failed == 3
await db_session.refresh(db_user)
assert db_user.pages_used > 0
assert db_user.pages_used <= db_user.pages_limit + 1