Merge remote-tracking branch 'upstream/dev' into feat/unified-etl-pipeline

This commit is contained in:
Anish Sarkar 2026-04-06 22:04:51 +05:30
commit 63a75052ca
76 changed files with 3041 additions and 376 deletions

View file

@ -28,6 +28,7 @@ from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.page_limit_service import PageLimitService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
@ -396,6 +397,12 @@ async def _index_full_scan(
},
)
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
remaining_quota = pages_limit - pages_used
batch_estimated_pages = 0
page_limit_reached = False
renamed_count = 0
skipped = 0
files_to_download: list[dict] = []
@ -425,6 +432,21 @@ async def _index_full_scan(
elif skip_item(file):
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
)
if batch_estimated_pages + file_pages > remaining_quota:
if not page_limit_reached:
logger.warning(
"Page limit reached during Dropbox full scan, "
"skipping remaining files"
)
page_limit_reached = True
skipped += 1
continue
batch_estimated_pages += file_pages
files_to_download.append(file)
batch_indexed, failed = await _download_and_index(
@ -438,6 +460,14 @@ async def _index_full_scan(
on_heartbeat=on_heartbeat_callback,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
pages_to_deduct = max(
1, batch_estimated_pages * batch_indexed // len(files_to_download)
)
await page_limit_service.update_page_usage(
user_id, pages_to_deduct, allow_exceed=True
)
indexed = renamed_count + batch_indexed
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
@ -458,6 +488,11 @@ async def _index_selected_files(
on_heartbeat: HeartbeatCallbackType | None = None,
) -> tuple[int, int, list[str]]:
"""Index user-selected files using the parallel pipeline."""
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
remaining_quota = pages_limit - pages_used
batch_estimated_pages = 0
files_to_download: list[dict] = []
errors: list[str] = []
renamed_count = 0
@ -482,6 +517,15 @@ async def _index_selected_files(
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
)
if batch_estimated_pages + file_pages > remaining_quota:
display = file_name or file_path
errors.append(f"File '{display}': page limit would be exceeded")
continue
batch_estimated_pages += file_pages
files_to_download.append(file)
batch_indexed, _failed = await _download_and_index(
@ -495,6 +539,14 @@ async def _index_selected_files(
on_heartbeat=on_heartbeat,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
pages_to_deduct = max(
1, batch_estimated_pages * batch_indexed // len(files_to_download)
)
await page_limit_service.update_page_usage(
user_id, pages_to_deduct, allow_exceed=True
)
return renamed_count + batch_indexed, skipped, errors

View file

@ -34,6 +34,7 @@ from app.indexing_pipeline.indexing_pipeline_service import (
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.page_limit_service import PageLimitService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
@ -327,6 +328,12 @@ async def _process_single_file(
return 1, 0, 0
return 0, 1, 0
page_limit_service = PageLimitService(session)
estimated_pages = PageLimitService.estimate_pages_from_metadata(
file_name, file.get("size")
)
await page_limit_service.check_page_limit(user_id, estimated_pages)
markdown, drive_metadata, error = await download_and_extract_content(
drive_client, file
)
@ -363,6 +370,9 @@ async def _process_single_file(
)
await pipeline.index(document, connector_doc, user_llm)
await page_limit_service.update_page_usage(
user_id, estimated_pages, allow_exceed=True
)
logger.info(f"Successfully indexed Google Drive file: {file_name}")
return 1, 0, 0
@ -466,6 +476,11 @@ async def _index_selected_files(
Returns (indexed_count, skipped_count, errors).
"""
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
remaining_quota = pages_limit - pages_used
batch_estimated_pages = 0
files_to_download: list[dict] = []
errors: list[str] = []
renamed_count = 0
@ -486,6 +501,15 @@ async def _index_selected_files(
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
)
if batch_estimated_pages + file_pages > remaining_quota:
display = file_name or file_id
errors.append(f"File '{display}': page limit would be exceeded")
continue
batch_estimated_pages += file_pages
files_to_download.append(file)
await _create_drive_placeholders(
@ -507,6 +531,14 @@ async def _index_selected_files(
on_heartbeat=on_heartbeat,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
pages_to_deduct = max(
1, batch_estimated_pages * batch_indexed // len(files_to_download)
)
await page_limit_service.update_page_usage(
user_id, pages_to_deduct, allow_exceed=True
)
return renamed_count + batch_indexed, skipped, errors
@ -545,6 +577,12 @@ async def _index_full_scan(
# ------------------------------------------------------------------
# Phase 1 (serial): collect files, run skip checks, track renames
# ------------------------------------------------------------------
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
remaining_quota = pages_limit - pages_used
batch_estimated_pages = 0
page_limit_reached = False
renamed_count = 0
skipped = 0
files_processed = 0
@ -593,6 +631,20 @@ async def _index_full_scan(
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
)
if batch_estimated_pages + file_pages > remaining_quota:
if not page_limit_reached:
logger.warning(
"Page limit reached during Google Drive full scan, "
"skipping remaining files"
)
page_limit_reached = True
skipped += 1
continue
batch_estimated_pages += file_pages
files_to_download.append(file)
page_token = next_token
@ -636,6 +688,14 @@ async def _index_full_scan(
on_heartbeat=on_heartbeat_callback,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
pages_to_deduct = max(
1, batch_estimated_pages * batch_indexed // len(files_to_download)
)
await page_limit_service.update_page_usage(
user_id, pages_to_deduct, allow_exceed=True
)
indexed = renamed_count + batch_indexed
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
@ -686,6 +746,12 @@ async def _index_with_delta_sync(
# ------------------------------------------------------------------
# Phase 1 (serial): handle removals, collect files for download
# ------------------------------------------------------------------
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
remaining_quota = pages_limit - pages_used
batch_estimated_pages = 0
page_limit_reached = False
renamed_count = 0
skipped = 0
files_to_download: list[dict] = []
@ -715,6 +781,20 @@ async def _index_with_delta_sync(
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
)
if batch_estimated_pages + file_pages > remaining_quota:
if not page_limit_reached:
logger.warning(
"Page limit reached during Google Drive delta sync, "
"skipping remaining files"
)
page_limit_reached = True
skipped += 1
continue
batch_estimated_pages += file_pages
files_to_download.append(file)
# ------------------------------------------------------------------
@ -742,6 +822,14 @@ async def _index_with_delta_sync(
on_heartbeat=on_heartbeat_callback,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
pages_to_deduct = max(
1, batch_estimated_pages * batch_indexed // len(files_to_download)
)
await page_limit_service.update_page_usage(
user_id, pages_to_deduct, allow_exceed=True
)
indexed = renamed_count + batch_indexed
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"

View file

@ -79,6 +79,7 @@ def _compute_final_pages(
actual = page_limit_service.estimate_pages_from_content_length(content_length)
return max(estimated_pages, actual)
DEFAULT_EXCLUDE_PATTERNS = [
".git",
"node_modules",

View file

@ -28,6 +28,7 @@ from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.page_limit_service import PageLimitService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
@ -291,6 +292,11 @@ async def _index_selected_files(
on_heartbeat: HeartbeatCallbackType | None = None,
) -> tuple[int, int, list[str]]:
"""Index user-selected files using the parallel pipeline."""
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
remaining_quota = pages_limit - pages_used
batch_estimated_pages = 0
files_to_download: list[dict] = []
errors: list[str] = []
renamed_count = 0
@ -311,6 +317,15 @@ async def _index_selected_files(
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
)
if batch_estimated_pages + file_pages > remaining_quota:
display = file_name or file_id
errors.append(f"File '{display}': page limit would be exceeded")
continue
batch_estimated_pages += file_pages
files_to_download.append(file)
batch_indexed, _failed = await _download_and_index(
@ -324,6 +339,14 @@ async def _index_selected_files(
on_heartbeat=on_heartbeat,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
pages_to_deduct = max(
1, batch_estimated_pages * batch_indexed // len(files_to_download)
)
await page_limit_service.update_page_usage(
user_id, pages_to_deduct, allow_exceed=True
)
return renamed_count + batch_indexed, skipped, errors
@ -358,6 +381,12 @@ async def _index_full_scan(
},
)
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
remaining_quota = pages_limit - pages_used
batch_estimated_pages = 0
page_limit_reached = False
renamed_count = 0
skipped = 0
files_to_download: list[dict] = []
@ -383,6 +412,21 @@ async def _index_full_scan(
else:
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
)
if batch_estimated_pages + file_pages > remaining_quota:
if not page_limit_reached:
logger.warning(
"Page limit reached during OneDrive full scan, "
"skipping remaining files"
)
page_limit_reached = True
skipped += 1
continue
batch_estimated_pages += file_pages
files_to_download.append(file)
batch_indexed, failed = await _download_and_index(
@ -396,6 +440,14 @@ async def _index_full_scan(
on_heartbeat=on_heartbeat_callback,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
pages_to_deduct = max(
1, batch_estimated_pages * batch_indexed // len(files_to_download)
)
await page_limit_service.update_page_usage(
user_id, pages_to_deduct, allow_exceed=True
)
indexed = renamed_count + batch_indexed
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
@ -441,6 +493,12 @@ async def _index_with_delta_sync(
logger.info(f"Processing {len(changes)} delta changes")
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
remaining_quota = pages_limit - pages_used
batch_estimated_pages = 0
page_limit_reached = False
renamed_count = 0
skipped = 0
files_to_download: list[dict] = []
@ -471,6 +529,20 @@ async def _index_with_delta_sync(
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
change.get("name", ""), change.get("size")
)
if batch_estimated_pages + file_pages > remaining_quota:
if not page_limit_reached:
logger.warning(
"Page limit reached during OneDrive delta sync, "
"skipping remaining files"
)
page_limit_reached = True
skipped += 1
continue
batch_estimated_pages += file_pages
files_to_download.append(change)
batch_indexed, failed = await _download_and_index(
@ -484,6 +556,14 @@ async def _index_with_delta_sync(
on_heartbeat=on_heartbeat_callback,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
pages_to_deduct = max(
1, batch_estimated_pages * batch_indexed // len(files_to_download)
)
await page_limit_service.update_page_usage(
user_id, pages_to_deduct, allow_exceed=True
)
indexed = renamed_count + batch_indexed
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"