feat: add processing mode support for document uploads and ETL pipeline, improded error handling ux
Some checks are pending
Build and Push Docker Images / tag_release (push) Waiting to run
Build and Push Docker Images / build (./surfsense_backend, ./surfsense_backend/Dockerfile, backend, surfsense-backend, ubuntu-24.04-arm, linux/arm64, arm64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_backend, ./surfsense_backend/Dockerfile, backend, surfsense-backend, ubuntu-latest, linux/amd64, amd64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_web, ./surfsense_web/Dockerfile, web, surfsense-web, ubuntu-24.04-arm, linux/arm64, arm64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_web, ./surfsense_web/Dockerfile, web, surfsense-web, ubuntu-latest, linux/amd64, amd64) (push) Blocked by required conditions
Build and Push Docker Images / create_manifest (backend, surfsense-backend) (push) Blocked by required conditions
Build and Push Docker Images / create_manifest (web, surfsense-web) (push) Blocked by required conditions

- Introduced a `ProcessingMode` enum to differentiate between basic and premium processing modes.
- Updated `EtlRequest` to include a `processing_mode` field, defaulting to basic.
- Enhanced ETL pipeline services to utilize the selected processing mode for Azure Document Intelligence and LlamaCloud parsing.
- Modified various routes and services to handle processing mode, affecting document upload and indexing tasks.
- Improved error handling and logging to include processing mode details.
- Added tests to validate processing mode functionality and its impact on ETL operations.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-04-14 21:26:00 -07:00
parent b659f41bab
commit 656e061f84
104 changed files with 1900 additions and 909 deletions

View file

@ -780,6 +780,7 @@ def process_file_upload_with_document_task(
user_id: str,
should_summarize: bool = False,
use_vision_llm: bool = False,
processing_mode: str = "basic",
):
"""
Celery task to process uploaded file with existing pending document.
@ -836,6 +837,7 @@ def process_file_upload_with_document_task(
user_id,
should_summarize=should_summarize,
use_vision_llm=use_vision_llm,
processing_mode=processing_mode,
)
)
logger.info(
@ -873,6 +875,7 @@ async def _process_file_with_document(
user_id: str,
should_summarize: bool = False,
use_vision_llm: bool = False,
processing_mode: str = "basic",
):
"""
Process file and update existing pending document status.
@ -976,6 +979,7 @@ async def _process_file_with_document(
notification=notification,
should_summarize=should_summarize,
use_vision_llm=use_vision_llm,
processing_mode=processing_mode,
)
# Update notification on success
@ -1434,6 +1438,7 @@ def index_uploaded_folder_files_task(
enable_summary: bool,
file_mappings: list[dict],
use_vision_llm: bool = False,
processing_mode: str = "basic",
):
"""Celery task to index files uploaded from the desktop app."""
loop = asyncio.new_event_loop()
@ -1448,6 +1453,7 @@ def index_uploaded_folder_files_task(
enable_summary=enable_summary,
file_mappings=file_mappings,
use_vision_llm=use_vision_llm,
processing_mode=processing_mode,
)
)
finally:
@ -1462,6 +1468,7 @@ async def _index_uploaded_folder_files_async(
enable_summary: bool,
file_mappings: list[dict],
use_vision_llm: bool = False,
processing_mode: str = "basic",
):
"""Run upload-based folder indexing with notification + heartbeat."""
file_count = len(file_mappings)
@ -1512,6 +1519,7 @@ async def _index_uploaded_folder_files_async(
file_mappings=file_mappings,
on_heartbeat_callback=_heartbeat_progress,
use_vision_llm=use_vision_llm,
processing_mode=processing_mode,
)
if notification:

View file

@ -60,14 +60,16 @@ async def _check_page_limit_or_skip(
page_limit_service: PageLimitService,
user_id: str,
file_path: str,
) -> int:
page_multiplier: int = 1,
) -> tuple[int, int]:
"""Estimate pages and check the limit; raises PageLimitExceededError if over quota.
Returns the estimated page count on success.
Returns (estimated_pages, billable_pages).
"""
estimated = _estimate_pages_safe(page_limit_service, file_path)
await page_limit_service.check_page_limit(user_id, estimated)
return estimated
billable = estimated * page_multiplier
await page_limit_service.check_page_limit(user_id, billable)
return estimated, billable
def _compute_final_pages(
@ -153,17 +155,20 @@ def scan_folder(
return files
async def _read_file_content(file_path: str, filename: str, *, vision_llm=None) -> str:
async def _read_file_content(
file_path: str, filename: str, *, vision_llm=None, processing_mode: str = "basic"
) -> str:
"""Read file content via the unified ETL pipeline.
All file types (plaintext, audio, direct-convert, document, image) are
handled by ``EtlPipelineService``.
"""
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_document import EtlRequest, ProcessingMode
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
mode = ProcessingMode.coerce(processing_mode)
result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=file_path, filename=filename)
EtlRequest(file_path=file_path, filename=filename, processing_mode=mode)
)
return result.markdown_content
@ -201,12 +206,15 @@ async def _compute_file_content_hash(
search_space_id: int,
*,
vision_llm=None,
processing_mode: str = "basic",
) -> tuple[str, str]:
"""Read a file (via ETL if needed) and compute its content hash.
Returns (content_text, content_hash).
"""
content = await _read_file_content(file_path, filename, vision_llm=vision_llm)
content = await _read_file_content(
file_path, filename, vision_llm=vision_llm, processing_mode=processing_mode
)
return content, _content_hash(content, search_space_id)
@ -694,7 +702,7 @@ async def index_local_folder(
continue
try:
estimated_pages = await _check_page_limit_or_skip(
estimated_pages, _billable = await _check_page_limit_or_skip(
page_limit_service, user_id, file_path_abs
)
except PageLimitExceededError:
@ -730,7 +738,7 @@ async def index_local_folder(
await create_version_snapshot(session, existing_document)
else:
try:
estimated_pages = await _check_page_limit_or_skip(
estimated_pages, _billable = await _check_page_limit_or_skip(
page_limit_service, user_id, file_path_abs
)
except PageLimitExceededError:
@ -1080,7 +1088,7 @@ async def _index_single_file(
page_limit_service = PageLimitService(session)
try:
estimated_pages = await _check_page_limit_or_skip(
estimated_pages, _billable = await _check_page_limit_or_skip(
page_limit_service, user_id, str(full_path)
)
except PageLimitExceededError as e:
@ -1271,6 +1279,7 @@ async def index_uploaded_files(
file_mappings: list[dict],
on_heartbeat_callback: HeartbeatCallbackType | None = None,
use_vision_llm: bool = False,
processing_mode: str = "basic",
) -> tuple[int, int, str | None]:
"""Index files uploaded from the desktop app via temp paths.
@ -1281,12 +1290,16 @@ async def index_uploaded_files(
Returns ``(indexed_count, failed_count, error_summary_or_none)``.
"""
from app.etl_pipeline.etl_document import ProcessingMode
mode = ProcessingMode.coerce(processing_mode)
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="local_folder_indexing",
source="uploaded_folder_indexing",
message=f"Indexing {len(file_mappings)} uploaded file(s) for {folder_name}",
metadata={"file_count": len(file_mappings)},
metadata={"file_count": len(file_mappings), "processing_mode": mode.value},
)
try:
@ -1350,8 +1363,11 @@ async def index_uploaded_files(
continue
try:
estimated_pages = await _check_page_limit_or_skip(
page_limit_service, user_id, temp_path
estimated_pages, _billable_pages = await _check_page_limit_or_skip(
page_limit_service,
user_id,
temp_path,
page_multiplier=mode.page_multiplier,
)
except PageLimitExceededError:
logger.warning(f"Page limit exceeded, skipping: {relative_path}")
@ -1364,6 +1380,7 @@ async def index_uploaded_files(
filename,
search_space_id,
vision_llm=vision_llm_instance,
processing_mode=mode.value,
)
except Exception as e:
logger.warning(f"Could not read {relative_path}: {e}")
@ -1429,8 +1446,9 @@ async def index_uploaded_files(
final_pages = _compute_final_pages(
page_limit_service, estimated_pages, len(content)
)
final_billable = final_pages * mode.page_multiplier
await page_limit_service.update_page_usage(
user_id, final_pages, allow_exceed=True
user_id, final_billable, allow_exceed=True
)
else:
failed_count += 1

View file

@ -47,6 +47,7 @@ class _ProcessingContext:
connector: dict | None = None
notification: Notification | None = None
use_vision_llm: bool = False
processing_mode: str = "basic"
enable_summary: bool = field(init=False)
def __post_init__(self) -> None:
@ -187,21 +188,28 @@ async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | No
async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
"""Route a document file to the configured ETL service via the unified pipeline."""
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_document import EtlRequest, ProcessingMode
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.services.page_limit_service import PageLimitExceededError, PageLimitService
mode = ProcessingMode.coerce(ctx.processing_mode)
page_limit_service = PageLimitService(ctx.session)
estimated_pages = _estimate_pages_safe(page_limit_service, ctx.file_path)
billable_pages = estimated_pages * mode.page_multiplier
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Estimated {estimated_pages} pages for file: {ctx.filename}",
{"estimated_pages": estimated_pages, "file_type": "document"},
{
"estimated_pages": estimated_pages,
"billable_pages": billable_pages,
"processing_mode": mode.value,
"file_type": "document",
},
)
try:
await page_limit_service.check_page_limit(ctx.user_id, estimated_pages)
await page_limit_service.check_page_limit(ctx.user_id, billable_pages)
except PageLimitExceededError as e:
await ctx.task_logger.log_task_failure(
ctx.log_entry,
@ -212,6 +220,8 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
"pages_used": e.pages_used,
"pages_limit": e.pages_limit,
"estimated_pages": estimated_pages,
"billable_pages": billable_pages,
"processing_mode": mode.value,
},
)
with contextlib.suppress(Exception):
@ -225,6 +235,7 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
file_path=ctx.file_path,
filename=ctx.filename,
estimated_pages=estimated_pages,
processing_mode=mode,
)
)
@ -246,7 +257,7 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
if result:
await page_limit_service.update_page_usage(
ctx.user_id, estimated_pages, allow_exceed=True
ctx.user_id, billable_pages, allow_exceed=True
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
@ -259,6 +270,8 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
"file_type": "document",
"etl_service": etl_result.etl_service,
"pages_processed": estimated_pages,
"billable_pages": billable_pages,
"processing_mode": mode.value,
},
)
else:
@ -290,6 +303,7 @@ async def process_file_in_background(
connector: dict | None = None,
notification: Notification | None = None,
use_vision_llm: bool = False,
processing_mode: str = "basic",
) -> Document | None:
ctx = _ProcessingContext(
session=session,
@ -302,6 +316,7 @@ async def process_file_in_background(
connector=connector,
notification=notification,
use_vision_llm=use_vision_llm,
processing_mode=processing_mode,
)
try:
@ -353,22 +368,25 @@ async def _extract_file_content(
log_entry: Log,
notification: Notification | None,
use_vision_llm: bool = False,
) -> tuple[str, str]:
processing_mode: str = "basic",
) -> tuple[str, str, int]:
"""
Extract markdown content from a file regardless of type.
Returns:
Tuple of (markdown_content, etl_service_name).
Tuple of (markdown_content, etl_service_name, billable_pages).
"""
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_document import EtlRequest, ProcessingMode
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.etl_pipeline.file_classifier import (
FileCategory,
classify_file as etl_classify,
)
mode = ProcessingMode.coerce(processing_mode)
category = etl_classify(filename)
estimated_pages = 0
billable_pages = 0
if notification:
stage_messages = {
@ -397,7 +415,8 @@ async def _extract_file_content(
page_limit_service = PageLimitService(session)
estimated_pages = _estimate_pages_safe(page_limit_service, file_path)
await page_limit_service.check_page_limit(user_id, estimated_pages)
billable_pages = estimated_pages * mode.page_multiplier
await page_limit_service.check_page_limit(user_id, billable_pages)
vision_llm = None
if use_vision_llm and category == FileCategory.IMAGE:
@ -410,21 +429,17 @@ async def _extract_file_content(
file_path=file_path,
filename=filename,
estimated_pages=estimated_pages,
processing_mode=mode,
)
)
if category == FileCategory.DOCUMENT:
await page_limit_service.update_page_usage(
user_id, estimated_pages, allow_exceed=True
)
with contextlib.suppress(Exception):
os.unlink(file_path)
if not result.markdown_content:
raise RuntimeError(f"Failed to extract content from file: {filename}")
return result.markdown_content, result.etl_service
return result.markdown_content, result.etl_service, billable_pages
async def process_file_in_background_with_document(
@ -440,12 +455,16 @@ async def process_file_in_background_with_document(
notification: Notification | None = None,
should_summarize: bool = False,
use_vision_llm: bool = False,
processing_mode: str = "basic",
) -> Document | None:
"""
Process file and update existing pending document (2-phase pattern).
Phase 1 (API layer): Created document with pending status.
Phase 2 (this function): Process file and update document to ready/failed.
Page usage is deferred until after dedup check and successful indexing
to avoid charging for duplicate or failed uploads.
"""
from app.indexing_pipeline.adapters.file_upload_adapter import (
UploadDocumentAdapter,
@ -458,8 +477,7 @@ async def process_file_in_background_with_document(
doc_id = document.id
try:
# Step 1: extract content
markdown_content, etl_service = await _extract_file_content(
markdown_content, etl_service, billable_pages = await _extract_file_content(
file_path,
filename,
search_space_id,
@ -469,12 +487,12 @@ async def process_file_in_background_with_document(
log_entry,
notification,
use_vision_llm=use_vision_llm,
processing_mode=processing_mode,
)
if not markdown_content:
raise RuntimeError(f"Failed to extract content from file: {filename}")
# Step 2: duplicate check
content_hash = generate_content_hash(markdown_content, search_space_id)
existing_by_content = await check_duplicate_document(session, content_hash)
if existing_by_content and existing_by_content.id != doc_id:
@ -484,7 +502,6 @@ async def process_file_in_background_with_document(
)
return None
# Step 3: index via pipeline
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
@ -505,6 +522,14 @@ async def process_file_in_background_with_document(
should_summarize=should_summarize,
)
if billable_pages > 0:
from app.services.page_limit_service import PageLimitService
page_limit_service = PageLimitService(session)
await page_limit_service.update_page_usage(
user_id, billable_pages, allow_exceed=True
)
await task_logger.log_task_success(
log_entry,
f"Successfully processed file: {filename}",
@ -512,6 +537,8 @@ async def process_file_in_background_with_document(
"document_id": doc_id,
"content_hash": content_hash,
"file_type": etl_service,
"billable_pages": billable_pages,
"processing_mode": processing_mode,
},
)
return document