Make Vision LLM opt-in for uploads and connectors

This commit is contained in:
CREDO23 2026-04-10 16:45:51 +02:00
parent 0aefcbd504
commit a95bf58c8f
24 changed files with 276 additions and 20 deletions

View file

@ -778,6 +778,7 @@ def process_file_upload_with_document_task(
search_space_id: int,
user_id: str,
should_summarize: bool = False,
use_vision_llm: bool = False,
):
"""
Celery task to process uploaded file with existing pending document.
@ -833,6 +834,7 @@ def process_file_upload_with_document_task(
search_space_id,
user_id,
should_summarize=should_summarize,
use_vision_llm=use_vision_llm,
)
)
logger.info(
@ -869,6 +871,7 @@ async def _process_file_with_document(
search_space_id: int,
user_id: str,
should_summarize: bool = False,
use_vision_llm: bool = False,
):
"""
Process file and update existing pending document status.
@ -971,6 +974,7 @@ async def _process_file_with_document(
log_entry=log_entry,
notification=notification,
should_summarize=should_summarize,
use_vision_llm=use_vision_llm,
)
# Update notification on success
@ -1428,6 +1432,7 @@ def index_uploaded_folder_files_task(
root_folder_id: int,
enable_summary: bool,
file_mappings: list[dict],
use_vision_llm: bool = False,
):
"""Celery task to index files uploaded from the desktop app."""
loop = asyncio.new_event_loop()
@ -1441,6 +1446,7 @@ def index_uploaded_folder_files_task(
root_folder_id=root_folder_id,
enable_summary=enable_summary,
file_mappings=file_mappings,
use_vision_llm=use_vision_llm,
)
)
finally:
@ -1454,6 +1460,7 @@ async def _index_uploaded_folder_files_async(
root_folder_id: int,
enable_summary: bool,
file_mappings: list[dict],
use_vision_llm: bool = False,
):
"""Run upload-based folder indexing with notification + heartbeat."""
file_count = len(file_mappings)
@ -1503,6 +1510,7 @@ async def _index_uploaded_folder_files_async(
enable_summary=enable_summary,
file_mappings=file_mappings,
on_heartbeat_callback=_heartbeat_progress,
use_vision_llm=use_vision_llm,
)
if notification:

View file

@ -164,6 +164,7 @@ async def _download_files_parallel(
enable_summary: bool,
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[list[ConnectorDocument], int]:
"""Download and ETL files in parallel. Returns (docs, failed_count)."""
results: list[ConnectorDocument] = []
@ -176,7 +177,7 @@ async def _download_files_parallel(
nonlocal last_heartbeat, completed_count
async with sem:
markdown, db_metadata, error = await download_and_extract_content(
dropbox_client, file
dropbox_client, file, vision_llm=vision_llm
)
if error or not markdown:
file_name = file.get("name", "Unknown")
@ -224,6 +225,7 @@ async def _download_and_index(
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int]:
"""Parallel download then parallel indexing. Returns (batch_indexed, total_failed)."""
connector_docs, download_failed = await _download_files_parallel(
@ -234,6 +236,7 @@ async def _download_and_index(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
batch_indexed = 0
@ -287,6 +290,7 @@ async def _index_with_delta_sync(
max_files: int,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int, str]:
"""Delta sync using Dropbox cursor-based change tracking.
@ -359,6 +363,7 @@ async def _index_with_delta_sync(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
indexed = renamed_count + batch_indexed
@ -384,6 +389,7 @@ async def _index_full_scan(
incremental_sync: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
@ -469,6 +475,7 @@ async def _index_full_scan(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
@ -498,6 +505,7 @@ async def _index_selected_files(
enable_summary: bool,
incremental_sync: bool = True,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int, int, list[str]]:
"""Index user-selected files using the parallel pipeline."""
page_limit_service = PageLimitService(session)
@ -557,6 +565,7 @@ async def _index_selected_files(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
@ -621,6 +630,13 @@ async def index_dropbox_files(
return 0, 0, error_msg, 0
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
vision_llm = None
if connector_enable_vision_llm:
from app.services.llm_service import get_vision_llm
vision_llm = await get_vision_llm(session, search_space_id)
dropbox_client = DropboxClient(session, connector_id)
indexing_options = items_dict.get("indexing_options", {})
@ -650,6 +666,7 @@ async def index_dropbox_files(
user_id=user_id,
enable_summary=connector_enable_summary,
incremental_sync=incremental_sync,
vision_llm=vision_llm,
)
total_indexed += indexed
total_skipped += skipped
@ -684,6 +701,7 @@ async def index_dropbox_files(
log_entry,
max_files,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
folder_cursors[folder_path] = new_cursor
total_unsupported += unsup
@ -703,6 +721,7 @@ async def index_dropbox_files(
include_subfolders,
incremental_sync=incremental_sync,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
total_unsupported += unsup

View file

@ -261,6 +261,7 @@ async def _download_files_parallel(
enable_summary: bool,
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[list[ConnectorDocument], int]:
"""Download and ETL files in parallel, returning ConnectorDocuments.
@ -276,7 +277,7 @@ async def _download_files_parallel(
nonlocal last_heartbeat, completed_count
async with sem:
markdown, drive_metadata, error = await download_and_extract_content(
drive_client, file
drive_client, file, vision_llm=vision_llm
)
if error or not markdown:
file_name = file.get("name", "Unknown")
@ -322,6 +323,7 @@ async def _process_single_file(
search_space_id: int,
user_id: str,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int]:
"""Download, extract, and index a single Drive file via the pipeline.
@ -343,7 +345,7 @@ async def _process_single_file(
await page_limit_service.check_page_limit(user_id, estimated_pages)
markdown, drive_metadata, error = await download_and_extract_content(
drive_client, file
drive_client, file, vision_llm=vision_llm
)
if error or not markdown:
logger.warning(f"ETL failed for {file_name}: {error}")
@ -433,6 +435,7 @@ async def _download_and_index(
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int]:
"""Phase 2+3: parallel download then parallel indexing.
@ -446,6 +449,7 @@ async def _download_and_index(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
batch_indexed = 0
@ -476,6 +480,7 @@ async def _index_selected_files(
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int, int, list[str]]:
"""Index user-selected files using the parallel pipeline.
@ -540,6 +545,7 @@ async def _index_selected_files(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
@ -573,6 +579,7 @@ async def _index_full_scan(
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
@ -703,6 +710,7 @@ async def _index_full_scan(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
@ -736,6 +744,7 @@ async def _index_with_delta_sync(
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int]:
"""Delta sync using change tracking.
@ -844,6 +853,7 @@ async def _index_with_delta_sync(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
@ -947,6 +957,11 @@ async def index_google_drive_files(
)
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
vision_llm = None
if connector_enable_vision_llm:
from app.services.llm_service import get_vision_llm
vision_llm = await get_vision_llm(session, search_space_id)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
@ -986,6 +1001,7 @@ async def index_google_drive_files(
include_subfolders,
on_heartbeat_callback,
connector_enable_summary,
vision_llm=vision_llm,
)
documents_unsupported += du
logger.info("Running reconciliation scan after delta sync")
@ -1004,6 +1020,7 @@ async def index_google_drive_files(
include_subfolders,
on_heartbeat_callback,
connector_enable_summary,
vision_llm=vision_llm,
)
documents_indexed += ri
documents_skipped += rs
@ -1029,6 +1046,7 @@ async def index_google_drive_files(
include_subfolders,
on_heartbeat_callback,
connector_enable_summary,
vision_llm=vision_llm,
)
if documents_indexed > 0 or can_use_delta:
@ -1146,6 +1164,11 @@ async def index_google_drive_single_file(
)
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
vision_llm = None
if connector_enable_vision_llm:
from app.services.llm_service import get_vision_llm
vision_llm = await get_vision_llm(session, search_space_id)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
@ -1168,6 +1191,7 @@ async def index_google_drive_single_file(
search_space_id,
user_id,
connector_enable_summary,
vision_llm=vision_llm,
)
await session.commit()
@ -1278,6 +1302,11 @@ async def index_google_drive_selected_files(
return 0, 0, [error_msg]
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
vision_llm = None
if connector_enable_vision_llm:
from app.services.llm_service import get_vision_llm
vision_llm = await get_vision_llm(session, search_space_id)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
@ -1291,6 +1320,7 @@ async def index_google_drive_selected_files(
user_id=user_id,
enable_summary=connector_enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
if unsupported > 0:

View file

@ -153,7 +153,7 @@ def scan_folder(
return files
async def _read_file_content(file_path: str, filename: str) -> str:
async def _read_file_content(file_path: str, filename: str, *, vision_llm=None) -> str:
"""Read file content via the unified ETL pipeline.
All file types (plaintext, audio, direct-convert, document, image) are
@ -162,7 +162,7 @@ async def _read_file_content(file_path: str, filename: str) -> str:
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
result = await EtlPipelineService().extract(
result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=file_path, filename=filename)
)
return result.markdown_content
@ -199,12 +199,14 @@ async def _compute_file_content_hash(
file_path: str,
filename: str,
search_space_id: int,
*,
vision_llm=None,
) -> tuple[str, str]:
"""Read a file (via ETL if needed) and compute its content hash.
Returns (content_text, content_hash).
"""
content = await _read_file_content(file_path, filename)
content = await _read_file_content(file_path, filename, vision_llm=vision_llm)
return content, _content_hash(content, search_space_id)
@ -1268,6 +1270,7 @@ async def index_uploaded_files(
enable_summary: bool,
file_mappings: list[dict],
on_heartbeat_callback: HeartbeatCallbackType | None = None,
use_vision_llm: bool = False,
) -> tuple[int, int, str | None]:
"""Index files uploaded from the desktop app via temp paths.
@ -1304,6 +1307,12 @@ async def index_uploaded_files(
pipeline = IndexingPipelineService(session)
llm = await get_user_long_context_llm(session, user_id, search_space_id)
vision_llm_instance = None
if use_vision_llm:
from app.services.llm_service import get_vision_llm
vision_llm_instance = await get_vision_llm(session, search_space_id)
indexed_count = 0
failed_count = 0
errors: list[str] = []
@ -1351,7 +1360,8 @@ async def index_uploaded_files(
try:
content, content_hash = await _compute_file_content_hash(
temp_path, filename, search_space_id
temp_path, filename, search_space_id,
vision_llm=vision_llm_instance,
)
except Exception as e:
logger.warning(f"Could not read {relative_path}: {e}")

View file

@ -171,6 +171,7 @@ async def _download_files_parallel(
enable_summary: bool,
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[list[ConnectorDocument], int]:
"""Download and ETL files in parallel. Returns (docs, failed_count)."""
results: list[ConnectorDocument] = []
@ -183,7 +184,7 @@ async def _download_files_parallel(
nonlocal last_heartbeat, completed_count
async with sem:
markdown, od_metadata, error = await download_and_extract_content(
onedrive_client, file
onedrive_client, file, vision_llm=vision_llm
)
if error or not markdown:
file_name = file.get("name", "Unknown")
@ -231,6 +232,7 @@ async def _download_and_index(
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int]:
"""Parallel download then parallel indexing. Returns (batch_indexed, total_failed)."""
connector_docs, download_failed = await _download_files_parallel(
@ -241,6 +243,7 @@ async def _download_and_index(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
batch_indexed = 0
@ -293,6 +296,7 @@ async def _index_selected_files(
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int, int, list[str]]:
"""Index user-selected files using the parallel pipeline."""
page_limit_service = PageLimitService(session)
@ -343,6 +347,7 @@ async def _index_selected_files(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
@ -375,6 +380,7 @@ async def _index_full_scan(
include_subfolders: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
@ -450,6 +456,7 @@ async def _index_full_scan(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
@ -481,6 +488,7 @@ async def _index_with_delta_sync(
max_files: int,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int, str | None]:
"""Delta sync using OneDrive change tracking.
@ -573,6 +581,7 @@ async def _index_with_delta_sync(
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0:
@ -643,6 +652,12 @@ async def index_onedrive_files(
return 0, 0, error_msg, 0
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
vision_llm = None
if connector_enable_vision_llm:
from app.services.llm_service import get_vision_llm
vision_llm = await get_vision_llm(session, search_space_id)
onedrive_client = OneDriveClient(session, connector_id)
indexing_options = items_dict.get("indexing_options", {})
@ -666,6 +681,7 @@ async def index_onedrive_files(
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
total_indexed += indexed
total_skipped += skipped
@ -695,6 +711,7 @@ async def index_onedrive_files(
log_entry,
max_files,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
total_indexed += indexed
total_skipped += skipped
@ -721,6 +738,7 @@ async def index_onedrive_files(
max_files,
include_subfolders,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
total_indexed += ri
total_skipped += rs
@ -740,6 +758,7 @@ async def index_onedrive_files(
max_files,
include_subfolders,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
total_indexed += indexed
total_skipped += skipped

View file

@ -46,6 +46,7 @@ class _ProcessingContext:
log_entry: Log
connector: dict | None = None
notification: Notification | None = None
use_vision_llm: bool = False
enable_summary: bool = field(init=False)
def __post_init__(self) -> None:
@ -134,7 +135,7 @@ async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | No
)
vision_llm = None
if etl_classify(ctx.filename) == FileCategory.IMAGE:
if ctx.use_vision_llm and etl_classify(ctx.filename) == FileCategory.IMAGE:
from app.services.llm_service import get_vision_llm
vision_llm = await get_vision_llm(ctx.session, ctx.search_space_id)
@ -288,6 +289,7 @@ async def process_file_in_background(
log_entry: Log,
connector: dict | None = None,
notification: Notification | None = None,
use_vision_llm: bool = False,
) -> Document | None:
ctx = _ProcessingContext(
session=session,
@ -299,6 +301,7 @@ async def process_file_in_background(
log_entry=log_entry,
connector=connector,
notification=notification,
use_vision_llm=use_vision_llm,
)
try:
@ -349,6 +352,7 @@ async def _extract_file_content(
task_logger: TaskLoggingService,
log_entry: Log,
notification: Notification | None,
use_vision_llm: bool = False,
) -> tuple[str, str]:
"""
Extract markdown content from a file regardless of type.
@ -396,7 +400,7 @@ async def _extract_file_content(
await page_limit_service.check_page_limit(user_id, estimated_pages)
vision_llm = None
if category == FileCategory.IMAGE:
if use_vision_llm and category == FileCategory.IMAGE:
from app.services.llm_service import get_vision_llm
vision_llm = await get_vision_llm(session, search_space_id)
@ -435,6 +439,7 @@ async def process_file_in_background_with_document(
connector: dict | None = None,
notification: Notification | None = None,
should_summarize: bool = False,
use_vision_llm: bool = False,
) -> Document | None:
"""
Process file and update existing pending document (2-phase pattern).
@ -463,6 +468,7 @@ async def process_file_in_background_with_document(
task_logger,
log_entry,
notification,
use_vision_llm=use_vision_llm,
)
if not markdown_content: