Merge commit '056fc0e7ff' into dev_mod

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-04-07 02:56:46 -07:00
commit 82b5c7f19e
111 changed files with 4056 additions and 2219 deletions

View file

@ -225,6 +225,55 @@ class DropboxClient:
return all_items, None
async def get_latest_cursor(self, path: str = "") -> tuple[str | None, str | None]:
"""Get a cursor representing the current state of a folder.
Uses /2/files/list_folder/get_latest_cursor so we can later call
get_changes to receive only incremental updates.
"""
resp = await self._request(
"/2/files/list_folder/get_latest_cursor",
{"path": path, "recursive": False, "include_non_downloadable_files": True},
)
if resp.status_code != 200:
return None, f"Failed to get cursor: {resp.status_code} - {resp.text}"
return resp.json().get("cursor"), None
async def get_changes(
self, cursor: str
) -> tuple[list[dict[str, Any]], str | None, str | None]:
"""Fetch incremental changes since the given cursor.
Calls /2/files/list_folder/continue and handles pagination.
Returns (entries, new_cursor, error).
"""
all_entries: list[dict[str, Any]] = []
resp = await self._request("/2/files/list_folder/continue", {"cursor": cursor})
if resp.status_code == 401:
return [], None, "Dropbox authentication expired (401)"
if resp.status_code != 200:
return [], None, f"Failed to get changes: {resp.status_code} - {resp.text}"
data = resp.json()
all_entries.extend(data.get("entries", []))
while data.get("has_more"):
cursor = data["cursor"]
resp = await self._request(
"/2/files/list_folder/continue", {"cursor": cursor}
)
if resp.status_code != 200:
return (
all_entries,
data.get("cursor"),
f"Pagination failed: {resp.status_code}",
)
data = resp.json()
all_entries.extend(data.get("entries", []))
return all_entries, data.get("cursor"), None
async def get_metadata(self, path: str) -> tuple[dict[str, Any] | None, str | None]:
resp = await self._request("/2/files/get_metadata", {"path": path})
if resp.status_code != 200:

View file

@ -53,7 +53,8 @@ async def download_and_extract_content(
file_name = file.get("name", "Unknown")
file_id = file.get("id", "")
if should_skip_file(file):
skip, _unsup_ext = should_skip_file(file)
if skip:
return None, {}, "Skipping non-indexable item"
logger.info(f"Downloading file for content extraction: {file_name}")
@ -87,9 +88,13 @@ async def download_and_extract_content(
if error:
return None, metadata, error
from app.connectors.onedrive.content_extractor import _parse_file_to_markdown
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
markdown = await _parse_file_to_markdown(temp_file_path, file_name)
result = await EtlPipelineService().extract(
EtlRequest(file_path=temp_file_path, filename=file_name)
)
markdown = result.markdown_content
return markdown, metadata, None
except Exception as e:

View file

@ -1,8 +1,8 @@
"""File type handlers for Dropbox."""
PAPER_EXTENSION = ".paper"
from app.etl_pipeline.file_classifier import should_skip_for_service
SKIP_EXTENSIONS: frozenset[str] = frozenset()
PAPER_EXTENSION = ".paper"
MIME_TO_EXTENSION: dict[str, str] = {
"application/pdf": ".pdf",
@ -42,17 +42,25 @@ def is_paper_file(item: dict) -> bool:
return ext == PAPER_EXTENSION
def should_skip_file(item: dict) -> bool:
def should_skip_file(item: dict) -> tuple[bool, str | None]:
"""Skip folders and truly non-indexable files.
Paper docs are non-downloadable but exportable, so they are NOT skipped.
Returns (should_skip, unsupported_extension_or_None).
"""
if is_folder(item):
return True
return True, None
if is_paper_file(item):
return False
return False, None
if not item.get("is_downloadable", True):
return True
return True, None
from pathlib import PurePosixPath
from app.config import config as app_config
name = item.get("name", "")
ext = get_extension_from_name(name).lower()
return ext in SKIP_EXTENSIONS
if should_skip_for_service(name, app_config.ETL_SERVICE):
ext = PurePosixPath(name).suffix.lower()
return True, ext
return False, None

View file

@ -64,8 +64,10 @@ async def get_files_in_folder(
)
continue
files.extend(sub_files)
elif not should_skip_file(item):
files.append(item)
else:
skip, _unsup_ext = should_skip_file(item)
if not skip:
files.append(item)
return files, None

View file

@ -1,12 +1,9 @@
"""Content extraction for Google Drive files."""
import asyncio
import contextlib
import logging
import os
import tempfile
import threading
import time
from pathlib import Path
from typing import Any
@ -20,6 +17,7 @@ from .file_types import (
get_export_mime_type,
get_extension_from_mime,
is_google_workspace_file,
should_skip_by_extension,
should_skip_file,
)
@ -45,6 +43,11 @@ async def download_and_extract_content(
if should_skip_file(mime_type):
return None, {}, f"Skipping {mime_type}"
if not is_google_workspace_file(mime_type):
ext_skip, _unsup_ext = should_skip_by_extension(file_name)
if ext_skip:
return None, {}, f"Skipping unsupported extension: {file_name}"
logger.info(f"Downloading file for content extraction: {file_name} ({mime_type})")
drive_metadata: dict[str, Any] = {
@ -97,7 +100,10 @@ async def download_and_extract_content(
if error:
return None, drive_metadata, error
markdown = await _parse_file_to_markdown(temp_file_path, file_name)
etl_filename = (
file_name + extension if is_google_workspace_file(mime_type) else file_name
)
markdown = await _parse_file_to_markdown(temp_file_path, etl_filename)
return markdown, drive_metadata, None
except Exception as e:
@ -110,99 +116,14 @@ async def download_and_extract_content(
async def _parse_file_to_markdown(file_path: str, filename: str) -> str:
"""Parse a local file to markdown using the configured ETL service."""
lower = filename.lower()
"""Parse a local file to markdown using the unified ETL pipeline."""
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
if lower.endswith((".md", ".markdown", ".txt")):
with open(file_path, encoding="utf-8") as f:
return f.read()
if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")):
from litellm import atranscription
from app.config import config as app_config
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
t0 = time.monotonic()
logger.info(
f"[local-stt] START file={filename} thread={threading.current_thread().name}"
)
result = await asyncio.to_thread(stt_service.transcribe_file, file_path)
logger.info(
f"[local-stt] END file={filename} elapsed={time.monotonic() - t0:.2f}s"
)
text = result.get("text", "")
else:
with open(file_path, "rb") as audio_file:
kwargs: dict[str, Any] = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
resp = await atranscription(**kwargs)
text = resp.get("text", "")
if not text:
raise ValueError("Transcription returned empty text")
return f"# Transcription of {filename}\n\n{text}"
# Document files -- use configured ETL service
from app.config import config as app_config
if app_config.ETL_SERVICE == "UNSTRUCTURED":
from langchain_unstructured import UnstructuredLoader
from app.utils.document_converters import convert_document_to_markdown
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
return await convert_document_to_markdown(docs)
if app_config.ETL_SERVICE == "LLAMACLOUD":
from app.tasks.document_processors.file_processors import (
parse_with_llamacloud_retry,
)
result = await parse_with_llamacloud_retry(
file_path=file_path, estimated_pages=50
)
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
if not markdown_documents:
raise RuntimeError(f"LlamaCloud returned no documents for {filename}")
return markdown_documents[0].text
if app_config.ETL_SERVICE == "DOCLING":
from docling.document_converter import DocumentConverter
converter = DocumentConverter()
t0 = time.monotonic()
logger.info(
f"[docling] START file={filename} thread={threading.current_thread().name}"
)
result = await asyncio.to_thread(converter.convert, file_path)
logger.info(
f"[docling] END file={filename} elapsed={time.monotonic() - t0:.2f}s"
)
return result.document.export_to_markdown()
raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}")
result = await EtlPipelineService().extract(
EtlRequest(file_path=file_path, filename=filename)
)
return result.markdown_content
async def download_and_process_file(
@ -236,10 +157,14 @@ async def download_and_process_file(
file_name = file.get("name", "Unknown")
mime_type = file.get("mimeType", "")
# Skip folders and shortcuts
if should_skip_file(mime_type):
return None, f"Skipping {mime_type}", None
if not is_google_workspace_file(mime_type):
ext_skip, _unsup_ext = should_skip_by_extension(file_name)
if ext_skip:
return None, f"Skipping unsupported extension: {file_name}", None
logger.info(f"Downloading file: {file_name} ({mime_type})")
temp_file_path = None
@ -310,10 +235,13 @@ async def download_and_process_file(
"."
)[-1]
etl_filename = (
file_name + extension if is_google_workspace_file(mime_type) else file_name
)
logger.info(f"Processing {file_name} with Surfsense's file processor")
await process_file_in_background(
file_path=temp_file_path,
filename=file_name,
filename=etl_filename,
search_space_id=search_space_id,
user_id=user_id,
session=session,

View file

@ -1,5 +1,7 @@
"""File type handlers for Google Drive."""
from app.etl_pipeline.file_classifier import should_skip_for_service
GOOGLE_DOC = "application/vnd.google-apps.document"
GOOGLE_SHEET = "application/vnd.google-apps.spreadsheet"
GOOGLE_SLIDE = "application/vnd.google-apps.presentation"
@ -46,6 +48,21 @@ def should_skip_file(mime_type: str) -> bool:
return mime_type in [GOOGLE_FOLDER, GOOGLE_SHORTCUT]
def should_skip_by_extension(filename: str) -> tuple[bool, str | None]:
"""Check if the file extension is not parseable by the configured ETL service.
Returns (should_skip, unsupported_extension_or_None).
"""
from pathlib import PurePosixPath
from app.config import config as app_config
if should_skip_for_service(filename, app_config.ETL_SERVICE):
ext = PurePosixPath(filename).suffix.lower()
return True, ext
return False, None
def get_export_mime_type(mime_type: str) -> str | None:
"""Get export MIME type for Google Workspace files."""
return EXPORT_FORMATS.get(mime_type)

View file

@ -1,16 +1,9 @@
"""Content extraction for OneDrive files.
"""Content extraction for OneDrive files."""
Reuses the same ETL parsing logic as Google Drive since file parsing is
extension-based, not provider-specific.
"""
import asyncio
import contextlib
import logging
import os
import tempfile
import threading
import time
from pathlib import Path
from typing import Any
@ -31,7 +24,8 @@ async def download_and_extract_content(
item_id = file.get("id")
file_name = file.get("name", "Unknown")
if should_skip_file(file):
skip, _unsup_ext = should_skip_file(file)
if skip:
return None, {}, "Skipping non-indexable item"
file_info = file.get("file", {})
@ -84,98 +78,11 @@ async def download_and_extract_content(
async def _parse_file_to_markdown(file_path: str, filename: str) -> str:
"""Parse a local file to markdown using the configured ETL service.
"""Parse a local file to markdown using the unified ETL pipeline."""
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
Same logic as Google Drive -- file parsing is extension-based.
"""
lower = filename.lower()
if lower.endswith((".md", ".markdown", ".txt")):
with open(file_path, encoding="utf-8") as f:
return f.read()
if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")):
from litellm import atranscription
from app.config import config as app_config
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
t0 = time.monotonic()
logger.info(
f"[local-stt] START file={filename} thread={threading.current_thread().name}"
)
result = await asyncio.to_thread(stt_service.transcribe_file, file_path)
logger.info(
f"[local-stt] END file={filename} elapsed={time.monotonic() - t0:.2f}s"
)
text = result.get("text", "")
else:
with open(file_path, "rb") as audio_file:
kwargs: dict[str, Any] = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
resp = await atranscription(**kwargs)
text = resp.get("text", "")
if not text:
raise ValueError("Transcription returned empty text")
return f"# Transcription of {filename}\n\n{text}"
from app.config import config as app_config
if app_config.ETL_SERVICE == "UNSTRUCTURED":
from langchain_unstructured import UnstructuredLoader
from app.utils.document_converters import convert_document_to_markdown
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
return await convert_document_to_markdown(docs)
if app_config.ETL_SERVICE == "LLAMACLOUD":
from app.tasks.document_processors.file_processors import (
parse_with_llamacloud_retry,
)
result = await parse_with_llamacloud_retry(
file_path=file_path, estimated_pages=50
)
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
if not markdown_documents:
raise RuntimeError(f"LlamaCloud returned no documents for {filename}")
return markdown_documents[0].text
if app_config.ETL_SERVICE == "DOCLING":
from docling.document_converter import DocumentConverter
converter = DocumentConverter()
t0 = time.monotonic()
logger.info(
f"[docling] START file={filename} thread={threading.current_thread().name}"
)
result = await asyncio.to_thread(converter.convert, file_path)
logger.info(
f"[docling] END file={filename} elapsed={time.monotonic() - t0:.2f}s"
)
return result.document.export_to_markdown()
raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}")
result = await EtlPipelineService().extract(
EtlRequest(file_path=file_path, filename=filename)
)
return result.markdown_content

View file

@ -1,5 +1,7 @@
"""File type handlers for Microsoft OneDrive."""
from app.etl_pipeline.file_classifier import should_skip_for_service
ONEDRIVE_FOLDER_FACET = "folder"
ONENOTE_MIME = "application/msonenote"
@ -38,13 +40,28 @@ def is_folder(item: dict) -> bool:
return ONEDRIVE_FOLDER_FACET in item
def should_skip_file(item: dict) -> bool:
"""Skip folders, OneNote files, remote items (shared links), and packages."""
def should_skip_file(item: dict) -> tuple[bool, str | None]:
"""Skip folders, OneNote files, remote items, packages, and unsupported extensions.
Returns (should_skip, unsupported_extension_or_None).
The second element is only set when the skip is due to an unsupported extension.
"""
if is_folder(item):
return True
return True, None
if "remoteItem" in item:
return True
return True, None
if "package" in item:
return True
return True, None
mime = item.get("file", {}).get("mimeType", "")
return mime in SKIP_MIME_TYPES
if mime in SKIP_MIME_TYPES:
return True, None
from pathlib import PurePosixPath
from app.config import config as app_config
name = item.get("name", "")
if should_skip_for_service(name, app_config.ETL_SERVICE):
ext = PurePosixPath(name).suffix.lower()
return True, ext
return False, None

View file

@ -71,8 +71,10 @@ async def get_files_in_folder(
)
continue
files.extend(sub_files)
elif not should_skip_file(item):
files.append(item)
else:
skip, _unsup_ext = should_skip_file(item)
if not skip:
files.append(item)
return files, None

View file

@ -0,0 +1,39 @@
import ssl
import httpx
LLAMACLOUD_MAX_RETRIES = 5
LLAMACLOUD_BASE_DELAY = 10
LLAMACLOUD_MAX_DELAY = 120
LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
ssl.SSLError,
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadError,
httpx.ReadTimeout,
httpx.WriteError,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
httpx.LocalProtocolError,
ConnectionError,
ConnectionResetError,
TimeoutError,
OSError,
)
UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024
MIN_UPLOAD_TIMEOUT = 120
MAX_UPLOAD_TIMEOUT = 1800
BASE_JOB_TIMEOUT = 600
PER_PAGE_JOB_TIMEOUT = 60
def calculate_upload_timeout(file_size_bytes: int) -> float:
estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5
return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT))
def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float:
page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT)
size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60
return max(page_based_timeout, size_based_timeout)

View file

@ -0,0 +1,21 @@
from pydantic import BaseModel, field_validator
class EtlRequest(BaseModel):
file_path: str
filename: str
estimated_pages: int = 0
@field_validator("filename")
@classmethod
def filename_must_not_be_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("filename must not be empty")
return v
class EtlResult(BaseModel):
markdown_content: str
etl_service: str
actual_pages: int = 0
content_type: str

View file

@ -0,0 +1,90 @@
from app.config import config as app_config
from app.etl_pipeline.etl_document import EtlRequest, EtlResult
from app.etl_pipeline.exceptions import (
EtlServiceUnavailableError,
EtlUnsupportedFileError,
)
from app.etl_pipeline.file_classifier import FileCategory, classify_file
from app.etl_pipeline.parsers.audio import transcribe_audio
from app.etl_pipeline.parsers.direct_convert import convert_file_directly
from app.etl_pipeline.parsers.plaintext import read_plaintext
class EtlPipelineService:
"""Single pipeline for extracting markdown from files. All callers use this."""
async def extract(self, request: EtlRequest) -> EtlResult:
category = classify_file(request.filename)
if category == FileCategory.UNSUPPORTED:
raise EtlUnsupportedFileError(
f"File type not supported for parsing: {request.filename}"
)
if category == FileCategory.PLAINTEXT:
content = read_plaintext(request.file_path)
return EtlResult(
markdown_content=content,
etl_service="PLAINTEXT",
content_type="plaintext",
)
if category == FileCategory.DIRECT_CONVERT:
content = convert_file_directly(request.file_path, request.filename)
return EtlResult(
markdown_content=content,
etl_service="DIRECT_CONVERT",
content_type="direct_convert",
)
if category == FileCategory.AUDIO:
content = await transcribe_audio(request.file_path, request.filename)
return EtlResult(
markdown_content=content,
etl_service="AUDIO",
content_type="audio",
)
return await self._extract_document(request)
async def _extract_document(self, request: EtlRequest) -> EtlResult:
from pathlib import PurePosixPath
from app.utils.file_extensions import get_document_extensions_for_service
etl_service = app_config.ETL_SERVICE
if not etl_service:
raise EtlServiceUnavailableError(
"No ETL_SERVICE configured. "
"Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env"
)
ext = PurePosixPath(request.filename).suffix.lower()
supported = get_document_extensions_for_service(etl_service)
if ext not in supported:
raise EtlUnsupportedFileError(
f"File type {ext} is not supported by {etl_service}"
)
if etl_service == "DOCLING":
from app.etl_pipeline.parsers.docling import parse_with_docling
content = await parse_with_docling(request.file_path, request.filename)
elif etl_service == "UNSTRUCTURED":
from app.etl_pipeline.parsers.unstructured import parse_with_unstructured
content = await parse_with_unstructured(request.file_path)
elif etl_service == "LLAMACLOUD":
from app.etl_pipeline.parsers.llamacloud import parse_with_llamacloud
content = await parse_with_llamacloud(
request.file_path, request.estimated_pages
)
else:
raise EtlServiceUnavailableError(f"Unknown ETL_SERVICE: {etl_service}")
return EtlResult(
markdown_content=content,
etl_service=etl_service,
content_type="document",
)

View file

@ -0,0 +1,10 @@
class EtlParseError(Exception):
"""Raised when an ETL parser fails to produce content."""
class EtlServiceUnavailableError(Exception):
"""Raised when the configured ETL_SERVICE is not recognised."""
class EtlUnsupportedFileError(Exception):
"""Raised when a file type cannot be parsed by any ETL pipeline."""

View file

@ -0,0 +1,137 @@
from enum import Enum
from pathlib import PurePosixPath
from app.utils.file_extensions import (
DOCUMENT_EXTENSIONS,
get_document_extensions_for_service,
)
PLAINTEXT_EXTENSIONS = frozenset(
{
".md",
".markdown",
".txt",
".text",
".json",
".jsonl",
".yaml",
".yml",
".toml",
".ini",
".cfg",
".conf",
".xml",
".css",
".scss",
".less",
".sass",
".py",
".pyw",
".pyi",
".pyx",
".js",
".jsx",
".ts",
".tsx",
".mjs",
".cjs",
".java",
".kt",
".kts",
".scala",
".groovy",
".c",
".h",
".cpp",
".cxx",
".cc",
".hpp",
".hxx",
".cs",
".fs",
".fsx",
".go",
".rs",
".rb",
".php",
".pl",
".pm",
".lua",
".swift",
".m",
".mm",
".r",
".jl",
".sh",
".bash",
".zsh",
".fish",
".bat",
".cmd",
".ps1",
".sql",
".graphql",
".gql",
".env",
".gitignore",
".dockerignore",
".editorconfig",
".makefile",
".cmake",
".log",
".rst",
".tex",
".bib",
".org",
".adoc",
".asciidoc",
".vue",
".svelte",
".astro",
".tf",
".hcl",
".proto",
}
)
AUDIO_EXTENSIONS = frozenset(
{".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm"}
)
DIRECT_CONVERT_EXTENSIONS = frozenset({".csv", ".tsv", ".html", ".htm", ".xhtml"})
class FileCategory(Enum):
PLAINTEXT = "plaintext"
AUDIO = "audio"
DIRECT_CONVERT = "direct_convert"
UNSUPPORTED = "unsupported"
DOCUMENT = "document"
def classify_file(filename: str) -> FileCategory:
suffix = PurePosixPath(filename).suffix.lower()
if suffix in PLAINTEXT_EXTENSIONS:
return FileCategory.PLAINTEXT
if suffix in AUDIO_EXTENSIONS:
return FileCategory.AUDIO
if suffix in DIRECT_CONVERT_EXTENSIONS:
return FileCategory.DIRECT_CONVERT
if suffix in DOCUMENT_EXTENSIONS:
return FileCategory.DOCUMENT
return FileCategory.UNSUPPORTED
def should_skip_for_service(filename: str, etl_service: str | None) -> bool:
"""Return True if *filename* cannot be processed by *etl_service*.
Plaintext, audio, and direct-convert files are parser-agnostic and never
skipped. Document files are checked against the per-parser extension set.
"""
category = classify_file(filename)
if category == FileCategory.UNSUPPORTED:
return True
if category == FileCategory.DOCUMENT:
suffix = PurePosixPath(filename).suffix.lower()
return suffix not in get_document_extensions_for_service(etl_service)
return False

View file

@ -0,0 +1,34 @@
from litellm import atranscription
from app.config import config as app_config
async def transcribe_audio(file_path: str, filename: str) -> str:
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
result = stt_service.transcribe_file(file_path)
text = result.get("text", "")
if not text:
raise ValueError("Transcription returned empty text")
else:
with open(file_path, "rb") as audio_file:
kwargs: dict = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
response = await atranscription(**kwargs)
text = response.get("text", "")
if not text:
raise ValueError("Transcription returned empty text")
return f"# Transcription of {filename}\n\n{text}"

View file

@ -0,0 +1,3 @@
from app.tasks.document_processors._direct_converters import convert_file_directly
__all__ = ["convert_file_directly"]

View file

@ -0,0 +1,26 @@
import warnings
from logging import ERROR, getLogger
async def parse_with_docling(file_path: str, filename: str) -> str:
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
pdfminer_logger = getLogger("pdfminer")
original_level = pdfminer_logger.level
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer")
warnings.filterwarnings(
"ignore", message=".*Cannot set gray non-stroke color.*"
)
warnings.filterwarnings("ignore", message=".*invalid float value.*")
pdfminer_logger.setLevel(ERROR)
try:
result = await docling_service.process_document(file_path, filename)
finally:
pdfminer_logger.setLevel(original_level)
return result["content"]

View file

@ -0,0 +1,123 @@
import asyncio
import logging
import os
import random
import httpx
from app.config import config as app_config
from app.etl_pipeline.constants import (
LLAMACLOUD_BASE_DELAY,
LLAMACLOUD_MAX_DELAY,
LLAMACLOUD_MAX_RETRIES,
LLAMACLOUD_RETRYABLE_EXCEPTIONS,
PER_PAGE_JOB_TIMEOUT,
calculate_job_timeout,
calculate_upload_timeout,
)
async def parse_with_llamacloud(file_path: str, estimated_pages: int) -> str:
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
file_size_bytes = os.path.getsize(file_path)
file_size_mb = file_size_bytes / (1024 * 1024)
upload_timeout = calculate_upload_timeout(file_size_bytes)
job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes)
custom_timeout = httpx.Timeout(
connect=120.0,
read=upload_timeout,
write=upload_timeout,
pool=120.0,
)
logging.info(
f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, "
f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, "
f"job_timeout={job_timeout:.0f}s"
)
last_exception = None
attempt_errors: list[str] = []
for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1):
try:
async with httpx.AsyncClient(timeout=custom_timeout) as custom_client:
parser = LlamaParse(
api_key=app_config.LLAMA_CLOUD_API_KEY,
num_workers=1,
verbose=True,
language="en",
result_type=ResultType.MD,
max_timeout=int(max(2000, job_timeout + upload_timeout)),
job_timeout_in_seconds=job_timeout,
job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT,
custom_client=custom_client,
)
result = await parser.aparse(file_path)
if attempt > 1:
logging.info(
f"LlamaCloud upload succeeded on attempt {attempt} after "
f"{len(attempt_errors)} failures"
)
if hasattr(result, "get_markdown_documents"):
markdown_docs = result.get_markdown_documents(split_by_page=False)
if markdown_docs and hasattr(markdown_docs[0], "text"):
return markdown_docs[0].text
if hasattr(result, "pages") and result.pages:
return "\n\n".join(
p.md for p in result.pages if hasattr(p, "md") and p.md
)
return str(result)
if isinstance(result, list):
if result and hasattr(result[0], "text"):
return result[0].text
return "\n\n".join(
doc.page_content if hasattr(doc, "page_content") else str(doc)
for doc in result
)
return str(result)
except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e:
last_exception = e
error_type = type(e).__name__
error_msg = str(e)[:200]
attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}")
if attempt < LLAMACLOUD_MAX_RETRIES:
base_delay = min(
LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)),
LLAMACLOUD_MAX_DELAY,
)
jitter = base_delay * 0.25 * (2 * random.random() - 1)
delay = base_delay + jitter
logging.warning(
f"LlamaCloud upload failed "
f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): "
f"{error_type}. File: {file_size_mb:.1f}MB. "
f"Retrying in {delay:.0f}s..."
)
await asyncio.sleep(delay)
else:
logging.error(
f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} "
f"attempts. File size: {file_size_mb:.1f}MB, "
f"Pages: {estimated_pages}. "
f"Errors: {'; '.join(attempt_errors)}"
)
except Exception:
raise
raise last_exception or RuntimeError(
f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. "
f"File size: {file_size_mb:.1f}MB"
)

View file

@ -0,0 +1,8 @@
def read_plaintext(file_path: str) -> str:
with open(file_path, encoding="utf-8", errors="replace") as f:
content = f.read()
if "\x00" in content:
raise ValueError(
f"File contains null bytes — likely a binary file opened as text: {file_path}"
)
return content

View file

@ -0,0 +1,14 @@
async def parse_with_unstructured(file_path: str) -> str:
from langchain_unstructured import UnstructuredLoader
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
return "\n\n".join(doc.page_content for doc in docs if doc.page_content)

View file

@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
@ -31,8 +31,11 @@ async def vision_autocomplete_stream(
return StreamingResponse(
stream_vision_autocomplete(
body.screenshot, body.search_space_id, session,
app_name=body.app_name, window_title=body.window_title,
body.screenshot,
body.search_space_id,
session,
app_name=body.app_name,
window_title=body.window_title,
),
media_type="text/event-stream",
headers={

View file

@ -311,9 +311,11 @@ async def dropbox_callback(
)
existing_cursor = db_connector.config.get("cursor")
existing_folder_cursors = db_connector.config.get("folder_cursors")
db_connector.config = {
**connector_config,
"cursor": existing_cursor,
"folder_cursors": existing_folder_cursors,
"auth_expired": False,
}
flag_modified(db_connector, "config")

View file

@ -2477,6 +2477,8 @@ async def run_google_drive_indexing(
stage="fetching",
)
total_unsupported = 0
# Index each folder with indexing options
for folder in items.folders:
try:
@ -2484,6 +2486,7 @@ async def run_google_drive_indexing(
indexed_count,
skipped_count,
error_message,
unsupported_count,
) = await index_google_drive_files(
session,
connector_id,
@ -2497,6 +2500,7 @@ async def run_google_drive_indexing(
include_subfolders=indexing_options.include_subfolders,
)
total_skipped += skipped_count
total_unsupported += unsupported_count
if error_message:
errors.append(f"Folder '{folder.name}': {error_message}")
else:
@ -2572,6 +2576,7 @@ async def run_google_drive_indexing(
indexed_count=total_indexed,
error_message=error_message,
skipped_count=total_skipped,
unsupported_count=total_unsupported,
)
except Exception as e:
@ -2642,7 +2647,12 @@ async def run_onedrive_indexing(
stage="fetching",
)
total_indexed, total_skipped, error_message = await index_onedrive_files(
(
total_indexed,
total_skipped,
error_message,
total_unsupported,
) = await index_onedrive_files(
session,
connector_id,
search_space_id,
@ -2683,6 +2693,7 @@ async def run_onedrive_indexing(
indexed_count=total_indexed,
error_message=error_message,
skipped_count=total_skipped,
unsupported_count=total_unsupported,
)
except Exception as e:
@ -2750,7 +2761,12 @@ async def run_dropbox_indexing(
stage="fetching",
)
total_indexed, total_skipped, error_message = await index_dropbox_files(
(
total_indexed,
total_skipped,
error_message,
total_unsupported,
) = await index_dropbox_files(
session,
connector_id,
search_space_id,
@ -2791,6 +2807,7 @@ async def run_dropbox_indexing(
indexed_count=total_indexed,
error_message=error_message,
skipped_count=total_skipped,
unsupported_count=total_unsupported,
)
except Exception as e:

View file

@ -111,9 +111,8 @@ class DoclingService:
pipeline_options=pipeline_options, backend=PyPdfiumDocumentBackend
)
# Initialize DocumentConverter
self.converter = DocumentConverter(
format_options={InputFormat.PDF: pdf_format_option}
format_options={InputFormat.PDF: pdf_format_option},
)
acceleration_type = "GPU (WSL2)" if self.use_gpu else "CPU"

View file

@ -421,6 +421,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
error_message: str | None = None,
is_warning: bool = False,
skipped_count: int | None = None,
unsupported_count: int | None = None,
) -> Notification:
"""
Update notification when connector indexing completes.
@ -428,10 +429,11 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
Args:
session: Database session
notification: Notification to update
indexed_count: Total number of items indexed
indexed_count: Total number of files indexed
error_message: Error message if indexing failed, or warning message (optional)
is_warning: If True, treat error_message as a warning (success case) rather than an error
skipped_count: Number of items skipped (e.g., duplicates) - optional
skipped_count: Number of files skipped (e.g., unchanged) - optional
unsupported_count: Number of files skipped because the ETL parser doesn't support them
Returns:
Updated notification
@ -440,52 +442,45 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"connector_name", "Connector"
)
# Build the skipped text if there are skipped items
skipped_text = ""
if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
skipped_text = (
f" ({skipped_count} {skipped_item_text} skipped - already indexed)"
)
unsupported_text = ""
if unsupported_count and unsupported_count > 0:
file_word = "file was" if unsupported_count == 1 else "files were"
unsupported_text = f" {unsupported_count} {file_word} not supported."
# If there's an error message but items were indexed, treat it as a warning (partial success)
# If is_warning is True, treat it as success even with 0 items (e.g., duplicates found)
# Otherwise, treat it as a failure
if error_message:
if indexed_count > 0:
# Partial success with warnings (e.g., duplicate content from other connectors)
title = f"Ready: {connector_name}"
item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}. Note: {error_message}"
file_text = "file" if indexed_count == 1 else "files"
message = f"Now searchable! {indexed_count} {file_text} synced.{unsupported_text} Note: {error_message}"
status = "completed"
elif is_warning:
# Warning case (e.g., duplicates found) - treat as success
title = f"Ready: {connector_name}"
message = f"Sync completed{skipped_text}. {error_message}"
message = f"Sync complete.{unsupported_text} {error_message}"
status = "completed"
else:
# Complete failure
title = f"Failed: {connector_name}"
message = f"Sync failed: {error_message}"
if unsupported_text:
message += unsupported_text
status = "failed"
else:
title = f"Ready: {connector_name}"
if indexed_count == 0:
if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
message = f"Already up to date! {skipped_count} {skipped_item_text} skipped (already indexed)."
if unsupported_count and unsupported_count > 0:
message = f"Sync complete.{unsupported_text}"
else:
message = "Already up to date! No new items to sync."
message = "Already up to date!"
else:
item_text = "item" if indexed_count == 1 else "items"
message = (
f"Now searchable! {indexed_count} {item_text} synced{skipped_text}."
)
file_text = "file" if indexed_count == 1 else "files"
message = f"Now searchable! {indexed_count} {file_text} synced."
if unsupported_text:
message += unsupported_text
status = "completed"
metadata_updates = {
"indexed_count": indexed_count,
"skipped_count": skipped_count or 0,
"unsupported_count": unsupported_count or 0,
"sync_stage": "completed"
if (not error_message or is_warning or indexed_count > 0)
else "failed",

View file

@ -8,7 +8,7 @@ Optimized pipeline:
"""
import logging
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
from langchain_core.messages import HumanMessage
from sqlalchemy.ext.asyncio import AsyncSession

View file

@ -51,7 +51,10 @@ async def _should_skip_file(
file_id = file.get("id", "")
file_name = file.get("name", "Unknown")
if skip_item(file):
skip, unsup_ext = skip_item(file)
if skip:
if unsup_ext:
return True, f"unsupported:{unsup_ext}"
return True, "folder/non-downloadable"
if not file_id:
return True, "missing file_id"
@ -251,6 +254,121 @@ async def _download_and_index(
return batch_indexed, download_failed + batch_failed
async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int):
"""Remove a document that was deleted in Dropbox."""
primary_hash = compute_identifier_hash(
DocumentType.DROPBOX_FILE.value, file_id, search_space_id
)
existing = await check_document_by_unique_identifier(session, primary_hash)
if not existing:
result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.DROPBOX_FILE,
cast(Document.document_metadata["dropbox_file_id"], String) == file_id,
)
)
existing = result.scalar_one_or_none()
if existing:
await session.delete(existing)
async def _index_with_delta_sync(
dropbox_client: DropboxClient,
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
cursor: str,
task_logger: TaskLoggingService,
log_entry: object,
max_files: int,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int, int, str]:
"""Delta sync using Dropbox cursor-based change tracking.
Returns (indexed_count, skipped_count, new_cursor).
"""
await task_logger.log_task_progress(
log_entry,
f"Starting delta sync from cursor: {cursor[:20]}...",
{"stage": "delta_sync", "cursor_prefix": cursor[:20]},
)
entries, new_cursor, error = await dropbox_client.get_changes(cursor)
if error:
err_lower = error.lower()
if "401" in error or "authentication expired" in err_lower:
raise Exception(
f"Dropbox authentication failed. Please re-authenticate. (Error: {error})"
)
raise Exception(f"Failed to fetch Dropbox changes: {error}")
if not entries:
logger.info("No changes detected since last sync")
return 0, 0, 0, new_cursor or cursor
logger.info(f"Processing {len(entries)} change entries")
renamed_count = 0
skipped = 0
unsupported_count = 0
files_to_download: list[dict] = []
files_processed = 0
for entry in entries:
if files_processed >= max_files:
break
files_processed += 1
tag = entry.get(".tag")
if tag == "deleted":
path_lower = entry.get("path_lower", "")
name = entry.get("name", "")
file_id = entry.get("id", "")
if file_id:
await _remove_document(session, file_id, search_space_id)
logger.debug(f"Processed deletion: {name or path_lower}")
continue
if tag != "file":
continue
skip, msg = await _should_skip_file(session, entry, search_space_id)
if skip:
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
continue
files_to_download.append(entry)
batch_indexed, failed = await _download_and_index(
dropbox_client,
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
)
indexed = renamed_count + batch_indexed
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped, unsupported_count, new_cursor or cursor
async def _index_full_scan(
dropbox_client: DropboxClient,
session: AsyncSession,
@ -266,8 +384,11 @@ async def _index_full_scan(
incremental_sync: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Full scan indexing of a folder."""
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
Returns (indexed, skipped, unsupported_count).
"""
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name}",
@ -287,6 +408,7 @@ async def _index_full_scan(
renamed_count = 0
skipped = 0
unsupported_count = 0
files_to_download: list[dict] = []
all_files, error = await get_files_in_folder(
@ -306,14 +428,21 @@ async def _index_full_scan(
if incremental_sync:
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
continue
elif skip_item(file):
skipped += 1
continue
else:
item_skip, item_unsup = skip_item(file)
if item_skip:
if item_unsup:
unsupported_count += 1
else:
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
@ -352,9 +481,10 @@ async def _index_full_scan(
indexed = renamed_count + batch_indexed
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Full scan complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped
return indexed, skipped, unsupported_count
async def _index_selected_files(
@ -368,7 +498,7 @@ async def _index_selected_files(
enable_summary: bool,
incremental_sync: bool = True,
on_heartbeat: HeartbeatCallbackType | None = None,
) -> tuple[int, int, list[str]]:
) -> tuple[int, int, int, list[str]]:
"""Index user-selected files using the parallel pipeline."""
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
@ -379,6 +509,7 @@ async def _index_selected_files(
errors: list[str] = []
renamed_count = 0
skipped = 0
unsupported_count = 0
for file_path, file_name in file_paths:
file, error = await get_file_by_path(dropbox_client, file_path)
@ -390,14 +521,21 @@ async def _index_selected_files(
if incremental_sync:
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
continue
elif skip_item(file):
skipped += 1
continue
else:
item_skip, item_unsup = skip_item(file)
if item_skip:
if item_unsup:
unsupported_count += 1
else:
skipped += 1
continue
file_pages = PageLimitService.estimate_pages_from_metadata(
file.get("name", ""), file.get("size")
@ -429,7 +567,7 @@ async def _index_selected_files(
user_id, pages_to_deduct, allow_exceed=True
)
return renamed_count + batch_indexed, skipped, errors
return renamed_count + batch_indexed, skipped, unsupported_count, errors
async def index_dropbox_files(
@ -438,7 +576,7 @@ async def index_dropbox_files(
search_space_id: int,
user_id: str,
items_dict: dict,
) -> tuple[int, int, str | None]:
) -> tuple[int, int, str | None, int]:
"""Index Dropbox files for a specific connector.
items_dict format:
@ -469,7 +607,7 @@ async def index_dropbox_files(
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
@ -480,7 +618,7 @@ async def index_dropbox_files(
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
connector_enable_summary = getattr(connector, "enable_summary", True)
dropbox_client = DropboxClient(session, connector_id)
@ -489,9 +627,13 @@ async def index_dropbox_files(
max_files = indexing_options.get("max_files", 500)
incremental_sync = indexing_options.get("incremental_sync", True)
include_subfolders = indexing_options.get("include_subfolders", True)
use_delta_sync = indexing_options.get("use_delta_sync", True)
folder_cursors: dict = connector.config.get("folder_cursors", {})
total_indexed = 0
total_skipped = 0
total_unsupported = 0
selected_files = items_dict.get("files", [])
if selected_files:
@ -499,7 +641,7 @@ async def index_dropbox_files(
(f.get("path", f.get("path_lower", f.get("id", ""))), f.get("name"))
for f in selected_files
]
indexed, skipped, file_errors = await _index_selected_files(
indexed, skipped, unsupported, file_errors = await _index_selected_files(
dropbox_client,
session,
file_tuples,
@ -511,6 +653,7 @@ async def index_dropbox_files(
)
total_indexed += indexed
total_skipped += skipped
total_unsupported += unsupported
if file_errors:
logger.warning(
f"File indexing errors for connector {connector_id}: {file_errors}"
@ -523,25 +666,66 @@ async def index_dropbox_files(
)
folder_name = folder.get("name", "Root")
logger.info(f"Using full scan for folder {folder_name}")
indexed, skipped = await _index_full_scan(
dropbox_client,
session,
connector_id,
search_space_id,
user_id,
folder_path,
folder_name,
task_logger,
log_entry,
max_files,
include_subfolders,
incremental_sync=incremental_sync,
enable_summary=connector_enable_summary,
saved_cursor = folder_cursors.get(folder_path)
can_use_delta = (
use_delta_sync and saved_cursor and connector.last_indexed_at
)
if can_use_delta:
logger.info(f"Using delta sync for folder {folder_name}")
indexed, skipped, unsup, new_cursor = await _index_with_delta_sync(
dropbox_client,
session,
connector_id,
search_space_id,
user_id,
saved_cursor,
task_logger,
log_entry,
max_files,
enable_summary=connector_enable_summary,
)
folder_cursors[folder_path] = new_cursor
total_unsupported += unsup
else:
logger.info(f"Using full scan for folder {folder_name}")
indexed, skipped, unsup = await _index_full_scan(
dropbox_client,
session,
connector_id,
search_space_id,
user_id,
folder_path,
folder_name,
task_logger,
log_entry,
max_files,
include_subfolders,
incremental_sync=incremental_sync,
enable_summary=connector_enable_summary,
)
total_unsupported += unsup
total_indexed += indexed
total_skipped += skipped
# Persist latest cursor for this folder
try:
latest_cursor, cursor_err = await dropbox_client.get_latest_cursor(
folder_path
)
if latest_cursor and not cursor_err:
folder_cursors[folder_path] = latest_cursor
except Exception as e:
logger.warning(f"Failed to get latest cursor for {folder_path}: {e}")
# Persist folder cursors to connector config
if folders:
cfg = dict(connector.config)
cfg["folder_cursors"] = folder_cursors
connector.config = cfg
flag_modified(connector, "config")
if total_indexed > 0 or folders:
await update_connector_last_indexed(session, connector, True)
@ -550,12 +734,18 @@ async def index_dropbox_files(
await task_logger.log_task_success(
log_entry,
f"Successfully completed Dropbox indexing for connector {connector_id}",
{"files_processed": total_indexed, "files_skipped": total_skipped},
{
"files_processed": total_indexed,
"files_skipped": total_skipped,
"files_unsupported": total_unsupported,
},
)
logger.info(
f"Dropbox indexing completed: {total_indexed} indexed, {total_skipped} skipped"
f"Dropbox indexing completed: {total_indexed} indexed, "
f"{total_skipped} skipped, {total_unsupported} unsupported"
)
return total_indexed, total_skipped, None
return total_indexed, total_skipped, None, total_unsupported
except SQLAlchemyError as db_error:
await session.rollback()
@ -566,7 +756,7 @@ async def index_dropbox_files(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}", 0
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -576,4 +766,4 @@ async def index_dropbox_files(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Dropbox files: {e!s}", exc_info=True)
return 0, 0, f"Failed to index Dropbox files: {e!s}"
return 0, 0, f"Failed to index Dropbox files: {e!s}", 0

View file

@ -25,7 +25,11 @@ from app.connectors.google_drive import (
get_files_in_folder,
get_start_page_token,
)
from app.connectors.google_drive.file_types import should_skip_file as skip_mime
from app.connectors.google_drive.file_types import (
is_google_workspace_file,
should_skip_by_extension,
should_skip_file as skip_mime,
)
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
@ -78,6 +82,10 @@ async def _should_skip_file(
if skip_mime(mime_type):
return True, "folder/shortcut"
if not is_google_workspace_file(mime_type):
ext_skip, unsup_ext = should_skip_by_extension(file_name)
if ext_skip:
return True, f"unsupported:{unsup_ext}"
if not file_id:
return True, "missing file_id"
@ -468,13 +476,13 @@ async def _index_selected_files(
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
) -> tuple[int, int, list[str]]:
) -> tuple[int, int, int, list[str]]:
"""Index user-selected files using the parallel pipeline.
Phase 1 (serial): fetch metadata + skip checks.
Phase 2+3 (parallel): download, ETL, index via _download_and_index.
Returns (indexed_count, skipped_count, errors).
Returns (indexed_count, skipped_count, unsupported_count, errors).
"""
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
@ -485,6 +493,7 @@ async def _index_selected_files(
errors: list[str] = []
renamed_count = 0
skipped = 0
unsupported_count = 0
for file_id, file_name in file_ids:
file, error = await get_file_by_id(drive_client, file_id)
@ -495,7 +504,9 @@ async def _index_selected_files(
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -539,7 +550,7 @@ async def _index_selected_files(
user_id, pages_to_deduct, allow_exceed=True
)
return renamed_count + batch_indexed, skipped, errors
return renamed_count + batch_indexed, skipped, unsupported_count, errors
# ---------------------------------------------------------------------------
@ -562,8 +573,11 @@ async def _index_full_scan(
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Full scan indexing of a folder."""
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
Returns (indexed, skipped, unsupported_count).
"""
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})",
@ -585,6 +599,7 @@ async def _index_full_scan(
renamed_count = 0
skipped = 0
unsupported_count = 0
files_processed = 0
files_to_download: list[dict] = []
folders_to_process = [(folder_id, folder_name)]
@ -625,7 +640,9 @@ async def _index_full_scan(
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -698,9 +715,10 @@ async def _index_full_scan(
indexed = renamed_count + batch_indexed
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Full scan complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped
return indexed, skipped, unsupported_count
async def _index_with_delta_sync(
@ -718,8 +736,11 @@ async def _index_with_delta_sync(
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Delta sync using change tracking."""
) -> tuple[int, int, int]:
"""Delta sync using change tracking.
Returns (indexed, skipped, unsupported_count).
"""
await task_logger.log_task_progress(
log_entry,
f"Starting delta sync from token: {start_page_token[:20]}...",
@ -739,7 +760,7 @@ async def _index_with_delta_sync(
if not changes:
logger.info("No changes detected since last sync")
return 0, 0
return 0, 0, 0
logger.info(f"Processing {len(changes)} changes")
@ -754,6 +775,7 @@ async def _index_with_delta_sync(
renamed_count = 0
skipped = 0
unsupported_count = 0
files_to_download: list[dict] = []
files_processed = 0
@ -775,7 +797,9 @@ async def _index_with_delta_sync(
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -832,9 +856,10 @@ async def _index_with_delta_sync(
indexed = renamed_count + batch_indexed
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Delta sync complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped
return indexed, skipped, unsupported_count
# ---------------------------------------------------------------------------
@ -854,8 +879,11 @@ async def index_google_drive_files(
max_files: int = 500,
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, str | None]:
"""Index Google Drive files for a specific connector."""
) -> tuple[int, int, str | None, int]:
"""Index Google Drive files for a specific connector.
Returns (indexed, skipped, error_or_none, unsupported_count).
"""
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="google_drive_files_indexing",
@ -881,7 +909,7 @@ async def index_google_drive_files(
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
await task_logger.log_task_progress(
log_entry,
@ -900,7 +928,7 @@ async def index_google_drive_files(
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
@ -915,6 +943,7 @@ async def index_google_drive_files(
0,
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
0,
)
connector_enable_summary = getattr(connector, "enable_summary", True)
@ -927,7 +956,7 @@ async def index_google_drive_files(
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "MissingParameter"}
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
target_folder_id = folder_id
target_folder_name = folder_name or "Selected Folder"
@ -938,9 +967,11 @@ async def index_google_drive_files(
use_delta_sync and start_page_token and connector.last_indexed_at
)
documents_unsupported = 0
if can_use_delta:
logger.info(f"Using delta sync for connector {connector_id}")
documents_indexed, documents_skipped = await _index_with_delta_sync(
documents_indexed, documents_skipped, du = await _index_with_delta_sync(
drive_client,
session,
connector,
@ -956,8 +987,9 @@ async def index_google_drive_files(
on_heartbeat_callback,
connector_enable_summary,
)
documents_unsupported += du
logger.info("Running reconciliation scan after delta sync")
ri, rs = await _index_full_scan(
ri, rs, ru = await _index_full_scan(
drive_client,
session,
connector,
@ -975,9 +1007,14 @@ async def index_google_drive_files(
)
documents_indexed += ri
documents_skipped += rs
documents_unsupported += ru
else:
logger.info(f"Using full scan for connector {connector_id}")
documents_indexed, documents_skipped = await _index_full_scan(
(
documents_indexed,
documents_skipped,
documents_unsupported,
) = await _index_full_scan(
drive_client,
session,
connector,
@ -1012,14 +1049,17 @@ async def index_google_drive_files(
{
"files_processed": documents_indexed,
"files_skipped": documents_skipped,
"files_unsupported": documents_unsupported,
"sync_type": "delta" if can_use_delta else "full",
"folder": target_folder_name,
},
)
logger.info(
f"Google Drive indexing completed: {documents_indexed} indexed, {documents_skipped} skipped"
f"Google Drive indexing completed: {documents_indexed} indexed, "
f"{documents_skipped} skipped, {documents_unsupported} unsupported"
)
return documents_indexed, documents_skipped, None
return documents_indexed, documents_skipped, None, documents_unsupported
except SQLAlchemyError as db_error:
await session.rollback()
@ -1030,7 +1070,7 @@ async def index_google_drive_files(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}", 0
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -1040,7 +1080,7 @@ async def index_google_drive_files(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google Drive files: {e!s}", exc_info=True)
return 0, 0, f"Failed to index Google Drive files: {e!s}"
return 0, 0, f"Failed to index Google Drive files: {e!s}", 0
async def index_google_drive_single_file(
@ -1242,7 +1282,7 @@ async def index_google_drive_selected_files(
session, connector_id, credentials=pre_built_credentials
)
indexed, skipped, errors = await _index_selected_files(
indexed, skipped, unsupported, errors = await _index_selected_files(
drive_client,
session,
files,
@ -1253,6 +1293,11 @@ async def index_google_drive_selected_files(
on_heartbeat=on_heartbeat_callback,
)
if unsupported > 0:
file_text = "file was" if unsupported == 1 else "files were"
unsup_msg = f"{unsupported} {file_text} not supported"
errors.append(unsup_msg)
await session.commit()
if errors:
@ -1260,7 +1305,12 @@ async def index_google_drive_selected_files(
log_entry,
f"Batch file indexing completed with {len(errors)} error(s)",
"; ".join(errors),
{"indexed": indexed, "skipped": skipped, "error_count": len(errors)},
{
"indexed": indexed,
"skipped": skipped,
"unsupported": unsupported,
"error_count": len(errors),
},
)
else:
await task_logger.log_task_success(

View file

@ -23,7 +23,6 @@ from sqlalchemy import select
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.db import (
Document,
DocumentStatus,
@ -44,132 +43,6 @@ from .base import (
logger,
)
PLAINTEXT_EXTENSIONS = frozenset(
{
".md",
".markdown",
".txt",
".text",
".json",
".jsonl",
".yaml",
".yml",
".toml",
".ini",
".cfg",
".conf",
".xml",
".css",
".scss",
".less",
".sass",
".py",
".pyw",
".pyi",
".pyx",
".js",
".jsx",
".ts",
".tsx",
".mjs",
".cjs",
".java",
".kt",
".kts",
".scala",
".groovy",
".c",
".h",
".cpp",
".cxx",
".cc",
".hpp",
".hxx",
".cs",
".fs",
".fsx",
".go",
".rs",
".rb",
".php",
".pl",
".pm",
".lua",
".swift",
".m",
".mm",
".r",
".R",
".jl",
".sh",
".bash",
".zsh",
".fish",
".bat",
".cmd",
".ps1",
".sql",
".graphql",
".gql",
".env",
".gitignore",
".dockerignore",
".editorconfig",
".makefile",
".cmake",
".log",
".rst",
".tex",
".bib",
".org",
".adoc",
".asciidoc",
".vue",
".svelte",
".astro",
".tf",
".hcl",
".proto",
}
)
AUDIO_EXTENSIONS = frozenset(
{
".mp3",
".mp4",
".mpeg",
".mpga",
".m4a",
".wav",
".webm",
}
)
DIRECT_CONVERT_EXTENSIONS = frozenset({".csv", ".tsv", ".html", ".htm"})
def _is_plaintext_file(filename: str) -> bool:
return Path(filename).suffix.lower() in PLAINTEXT_EXTENSIONS
def _is_audio_file(filename: str) -> bool:
return Path(filename).suffix.lower() in AUDIO_EXTENSIONS
def _is_direct_convert_file(filename: str) -> bool:
return Path(filename).suffix.lower() in DIRECT_CONVERT_EXTENSIONS
def _needs_etl(filename: str) -> bool:
"""File is not plaintext, not audio, and not direct-convert — requires ETL."""
return (
not _is_plaintext_file(filename)
and not _is_audio_file(filename)
and not _is_direct_convert_file(filename)
)
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
@ -279,57 +152,19 @@ def scan_folder(
return files
def _read_plaintext_file(file_path: str) -> str:
"""Read a plaintext/text-based file as UTF-8."""
with open(file_path, encoding="utf-8", errors="replace") as f:
content = f.read()
if "\x00" in content:
raise ValueError(
f"File contains null bytes — likely a binary file opened as text: {file_path}"
)
return content
async def _read_file_content(file_path: str, filename: str) -> str:
"""Read file content, using ETL for binary formats.
"""Read file content via the unified ETL pipeline.
Plaintext files are read directly. Audio and document files (PDF, DOCX, etc.)
are routed through the configured ETL service (same as Google Drive / OneDrive).
Raises ValueError if the file cannot be parsed (e.g. no ETL service configured
for a binary file).
All file types (plaintext, audio, direct-convert, document) are handled
by ``EtlPipelineService``.
"""
if _is_plaintext_file(filename):
return _read_plaintext_file(file_path)
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
if _is_direct_convert_file(filename):
from app.tasks.document_processors._direct_converters import (
convert_file_directly,
)
return convert_file_directly(file_path, filename)
if _is_audio_file(filename):
etl_service = config.ETL_SERVICE if hasattr(config, "ETL_SERVICE") else None
stt_service_val = config.STT_SERVICE if hasattr(config, "STT_SERVICE") else None
if not stt_service_val and not etl_service:
raise ValueError(
f"No STT_SERVICE configured — cannot transcribe audio file: {filename}"
)
if _needs_etl(filename):
etl_service = getattr(config, "ETL_SERVICE", None)
if not etl_service:
raise ValueError(
f"No ETL_SERVICE configured — cannot parse binary file: {filename}. "
f"Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env"
)
from app.connectors.onedrive.content_extractor import (
_parse_file_to_markdown,
result = await EtlPipelineService().extract(
EtlRequest(file_path=file_path, filename=filename)
)
return await _parse_file_to_markdown(file_path, filename)
return result.markdown_content
def _content_hash(content: str, search_space_id: int) -> str:

View file

@ -56,7 +56,10 @@ async def _should_skip_file(
file_id = file.get("id")
file_name = file.get("name", "Unknown")
if skip_item(file):
skip, unsup_ext = skip_item(file)
if skip:
if unsup_ext:
return True, f"unsupported:{unsup_ext}"
return True, "folder/onenote/remote"
if not file_id:
return True, "missing file_id"
@ -290,7 +293,7 @@ async def _index_selected_files(
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
) -> tuple[int, int, list[str]]:
) -> tuple[int, int, int, list[str]]:
"""Index user-selected files using the parallel pipeline."""
page_limit_service = PageLimitService(session)
pages_used, pages_limit = await page_limit_service.get_page_usage(user_id)
@ -301,6 +304,7 @@ async def _index_selected_files(
errors: list[str] = []
renamed_count = 0
skipped = 0
unsupported_count = 0
for file_id, file_name in file_ids:
file, error = await get_file_by_id(onedrive_client, file_id)
@ -311,7 +315,9 @@ async def _index_selected_files(
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -347,7 +353,7 @@ async def _index_selected_files(
user_id, pages_to_deduct, allow_exceed=True
)
return renamed_count + batch_indexed, skipped, errors
return renamed_count + batch_indexed, skipped, unsupported_count, errors
# ---------------------------------------------------------------------------
@ -369,8 +375,11 @@ async def _index_full_scan(
include_subfolders: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Full scan indexing of a folder."""
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
Returns (indexed, skipped, unsupported_count).
"""
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name}",
@ -389,6 +398,7 @@ async def _index_full_scan(
renamed_count = 0
skipped = 0
unsupported_count = 0
files_to_download: list[dict] = []
all_files, error = await get_files_in_folder(
@ -407,7 +417,9 @@ async def _index_full_scan(
for file in all_files[:max_files]:
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -450,9 +462,10 @@ async def _index_full_scan(
indexed = renamed_count + batch_indexed
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Full scan complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped
return indexed, skipped, unsupported_count
async def _index_with_delta_sync(
@ -468,8 +481,11 @@ async def _index_with_delta_sync(
max_files: int,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int, str | None]:
"""Delta sync using OneDrive change tracking. Returns (indexed, skipped, new_delta_link)."""
) -> tuple[int, int, int, str | None]:
"""Delta sync using OneDrive change tracking.
Returns (indexed, skipped, unsupported_count, new_delta_link).
"""
await task_logger.log_task_progress(
log_entry,
"Starting delta sync",
@ -489,7 +505,7 @@ async def _index_with_delta_sync(
if not changes:
logger.info("No changes detected since last sync")
return 0, 0, new_delta_link
return 0, 0, 0, new_delta_link
logger.info(f"Processing {len(changes)} delta changes")
@ -501,6 +517,7 @@ async def _index_with_delta_sync(
renamed_count = 0
skipped = 0
unsupported_count = 0
files_to_download: list[dict] = []
files_processed = 0
@ -523,7 +540,9 @@ async def _index_with_delta_sync(
skip, msg = await _should_skip_file(session, change, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
if msg and msg.startswith("unsupported:"):
unsupported_count += 1
elif msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
@ -566,9 +585,10 @@ async def _index_with_delta_sync(
indexed = renamed_count + batch_indexed
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"
f"Delta sync complete: {indexed} indexed, {skipped} skipped, "
f"{unsupported_count} unsupported, {failed} failed"
)
return indexed, skipped, new_delta_link
return indexed, skipped, unsupported_count, new_delta_link
# ---------------------------------------------------------------------------
@ -582,7 +602,7 @@ async def index_onedrive_files(
search_space_id: int,
user_id: str,
items_dict: dict,
) -> tuple[int, int, str | None]:
) -> tuple[int, int, str | None, int]:
"""Index OneDrive files for a specific connector.
items_dict format:
@ -609,7 +629,7 @@ async def index_onedrive_files(
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
@ -620,7 +640,7 @@ async def index_onedrive_files(
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return 0, 0, error_msg
return 0, 0, error_msg, 0
connector_enable_summary = getattr(connector, "enable_summary", True)
onedrive_client = OneDriveClient(session, connector_id)
@ -632,12 +652,13 @@ async def index_onedrive_files(
total_indexed = 0
total_skipped = 0
total_unsupported = 0
# Index selected individual files
selected_files = items_dict.get("files", [])
if selected_files:
file_tuples = [(f["id"], f.get("name")) for f in selected_files]
indexed, skipped, _errors = await _index_selected_files(
indexed, skipped, unsupported, _errors = await _index_selected_files(
onedrive_client,
session,
file_tuples,
@ -648,6 +669,7 @@ async def index_onedrive_files(
)
total_indexed += indexed
total_skipped += skipped
total_unsupported += unsupported
# Index selected folders
folders = items_dict.get("folders", [])
@ -661,7 +683,7 @@ async def index_onedrive_files(
if can_use_delta:
logger.info(f"Using delta sync for folder {folder_name}")
indexed, skipped, new_delta_link = await _index_with_delta_sync(
indexed, skipped, unsup, new_delta_link = await _index_with_delta_sync(
onedrive_client,
session,
connector_id,
@ -676,6 +698,7 @@ async def index_onedrive_files(
)
total_indexed += indexed
total_skipped += skipped
total_unsupported += unsup
if new_delta_link:
await session.refresh(connector)
@ -685,7 +708,7 @@ async def index_onedrive_files(
flag_modified(connector, "config")
# Reconciliation full scan
ri, rs = await _index_full_scan(
ri, rs, ru = await _index_full_scan(
onedrive_client,
session,
connector_id,
@ -701,9 +724,10 @@ async def index_onedrive_files(
)
total_indexed += ri
total_skipped += rs
total_unsupported += ru
else:
logger.info(f"Using full scan for folder {folder_name}")
indexed, skipped = await _index_full_scan(
indexed, skipped, unsup = await _index_full_scan(
onedrive_client,
session,
connector_id,
@ -719,6 +743,7 @@ async def index_onedrive_files(
)
total_indexed += indexed
total_skipped += skipped
total_unsupported += unsup
# Store new delta link for this folder
_, new_delta_link, _ = await onedrive_client.get_delta(folder_id=folder_id)
@ -737,12 +762,18 @@ async def index_onedrive_files(
await task_logger.log_task_success(
log_entry,
f"Successfully completed OneDrive indexing for connector {connector_id}",
{"files_processed": total_indexed, "files_skipped": total_skipped},
{
"files_processed": total_indexed,
"files_skipped": total_skipped,
"files_unsupported": total_unsupported,
},
)
logger.info(
f"OneDrive indexing completed: {total_indexed} indexed, {total_skipped} skipped"
f"OneDrive indexing completed: {total_indexed} indexed, "
f"{total_skipped} skipped, {total_unsupported} unsupported"
)
return total_indexed, total_skipped, None
return total_indexed, total_skipped, None, total_unsupported
except SQLAlchemyError as db_error:
await session.rollback()
@ -753,7 +784,7 @@ async def index_onedrive_files(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}", 0
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -763,4 +794,4 @@ async def index_onedrive_files(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index OneDrive files: {e!s}", exc_info=True)
return 0, 0, f"Failed to index OneDrive files: {e!s}"
return 0, 0, f"Failed to index OneDrive files: {e!s}", 0

View file

@ -1,41 +1,17 @@
"""
Document processors module for background tasks.
This module provides a collection of document processors for different content types
and sources. Each processor is responsible for handling a specific type of document
processing task in the background.
Available processors:
- Extension processor: Handle documents from browser extension
- Markdown processor: Process markdown files
- File processors: Handle files using different ETL services (Unstructured, LlamaCloud, Docling)
- YouTube processor: Process YouTube videos and extract transcripts
Content extraction is handled by ``app.etl_pipeline.EtlPipelineService``.
This package keeps orchestration (save, notify, page-limit) and
non-ETL processors (extension, markdown, youtube).
"""
# Extension processor
# File processors (backward-compatible re-exports from _save)
from ._save import (
add_received_file_document_using_docling,
add_received_file_document_using_llamacloud,
add_received_file_document_using_unstructured,
)
from .extension_processor import add_extension_received_document
# Markdown processor
from .markdown_processor import add_received_markdown_file_document
# YouTube processor
from .youtube_processor import add_youtube_video_document
__all__ = [
# Extension processing
"add_extension_received_document",
# File processing with different ETL services
"add_received_file_document_using_docling",
"add_received_file_document_using_llamacloud",
"add_received_file_document_using_unstructured",
# Markdown file processing
"add_received_markdown_file_document",
# YouTube video processing
"add_youtube_video_document",
]

View file

@ -1,74 +0,0 @@
"""
Constants for file document processing.
Centralizes file type classification, LlamaCloud retry configuration,
and timeout calculation parameters.
"""
import ssl
from enum import Enum
import httpx
# ---------------------------------------------------------------------------
# File type classification
# ---------------------------------------------------------------------------
MARKDOWN_EXTENSIONS = (".md", ".markdown", ".txt")
AUDIO_EXTENSIONS = (".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")
DIRECT_CONVERT_EXTENSIONS = (".csv", ".tsv", ".html", ".htm")
class FileCategory(Enum):
MARKDOWN = "markdown"
AUDIO = "audio"
DIRECT_CONVERT = "direct_convert"
DOCUMENT = "document"
def classify_file(filename: str) -> FileCategory:
"""Classify a file by its extension into a processing category."""
lower = filename.lower()
if lower.endswith(MARKDOWN_EXTENSIONS):
return FileCategory.MARKDOWN
if lower.endswith(AUDIO_EXTENSIONS):
return FileCategory.AUDIO
if lower.endswith(DIRECT_CONVERT_EXTENSIONS):
return FileCategory.DIRECT_CONVERT
return FileCategory.DOCUMENT
# ---------------------------------------------------------------------------
# LlamaCloud retry configuration
# ---------------------------------------------------------------------------
LLAMACLOUD_MAX_RETRIES = 5
LLAMACLOUD_BASE_DELAY = 10 # seconds (exponential backoff base)
LLAMACLOUD_MAX_DELAY = 120 # max delay between retries (2 minutes)
LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
ssl.SSLError,
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadError,
httpx.ReadTimeout,
httpx.WriteError,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
httpx.LocalProtocolError,
ConnectionError,
ConnectionResetError,
TimeoutError,
OSError,
)
# ---------------------------------------------------------------------------
# Timeout calculation constants
# ---------------------------------------------------------------------------
UPLOAD_BYTES_PER_SECOND_SLOW = (
100 * 1024
) # 100 KB/s (conservative for slow connections)
MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file
MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files
BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing
PER_PAGE_JOB_TIMEOUT = 60 # 1 minute per page for processing

View file

@ -4,8 +4,8 @@ Lossless file-to-markdown converters for text-based formats.
These converters handle file types that can be faithfully represented as
markdown without any external ETL/OCR service:
- CSV / TSV markdown table (stdlib ``csv``)
- HTML / HTM markdown (``markdownify``)
- CSV / TSV markdown table (stdlib ``csv``)
- HTML / HTM / XHTML markdown (``markdownify``)
"""
from __future__ import annotations
@ -73,6 +73,7 @@ _CONVERTER_MAP: dict[str, Callable[..., str]] = {
".tsv": tsv_to_markdown,
".html": html_to_markdown,
".htm": html_to_markdown,
".xhtml": html_to_markdown,
}

View file

@ -1,209 +0,0 @@
"""
ETL parsing strategies for different document processing services.
Provides parse functions for Unstructured, LlamaCloud, and Docling, along with
LlamaCloud retry logic and dynamic timeout calculations.
"""
import asyncio
import logging
import os
import random
import warnings
from logging import ERROR, getLogger
import httpx
from app.config import config as app_config
from app.db import Log
from app.services.task_logging_service import TaskLoggingService
from ._constants import (
LLAMACLOUD_BASE_DELAY,
LLAMACLOUD_MAX_DELAY,
LLAMACLOUD_MAX_RETRIES,
LLAMACLOUD_RETRYABLE_EXCEPTIONS,
PER_PAGE_JOB_TIMEOUT,
)
from ._helpers import calculate_job_timeout, calculate_upload_timeout
# ---------------------------------------------------------------------------
# LlamaCloud parsing with retry
# ---------------------------------------------------------------------------
async def parse_with_llamacloud_retry(
file_path: str,
estimated_pages: int,
task_logger: TaskLoggingService | None = None,
log_entry: Log | None = None,
):
"""
Parse a file with LlamaCloud with retry logic for transient SSL/connection errors.
Uses dynamic timeout calculations based on file size and page count to handle
very large files reliably.
Returns:
LlamaParse result object
Raises:
Exception: If all retries fail
"""
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
file_size_bytes = os.path.getsize(file_path)
file_size_mb = file_size_bytes / (1024 * 1024)
upload_timeout = calculate_upload_timeout(file_size_bytes)
job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes)
custom_timeout = httpx.Timeout(
connect=120.0,
read=upload_timeout,
write=upload_timeout,
pool=120.0,
)
logging.info(
f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, "
f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, "
f"job_timeout={job_timeout:.0f}s"
)
last_exception = None
attempt_errors: list[str] = []
for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1):
try:
async with httpx.AsyncClient(timeout=custom_timeout) as custom_client:
parser = LlamaParse(
api_key=app_config.LLAMA_CLOUD_API_KEY,
num_workers=1,
verbose=True,
language="en",
result_type=ResultType.MD,
max_timeout=int(max(2000, job_timeout + upload_timeout)),
job_timeout_in_seconds=job_timeout,
job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT,
custom_client=custom_client,
)
result = await parser.aparse(file_path)
if attempt > 1:
logging.info(
f"LlamaCloud upload succeeded on attempt {attempt} after "
f"{len(attempt_errors)} failures"
)
return result
except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e:
last_exception = e
error_type = type(e).__name__
error_msg = str(e)[:200]
attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}")
if attempt < LLAMACLOUD_MAX_RETRIES:
base_delay = min(
LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)),
LLAMACLOUD_MAX_DELAY,
)
jitter = base_delay * 0.25 * (2 * random.random() - 1)
delay = base_delay + jitter
if task_logger and log_entry:
await task_logger.log_task_progress(
log_entry,
f"LlamaCloud upload failed "
f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), "
f"retrying in {delay:.0f}s",
{
"error_type": error_type,
"error_message": error_msg,
"attempt": attempt,
"retry_delay": delay,
"file_size_mb": round(file_size_mb, 1),
"upload_timeout": upload_timeout,
},
)
else:
logging.warning(
f"LlamaCloud upload failed "
f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): "
f"{error_type}. File: {file_size_mb:.1f}MB. "
f"Retrying in {delay:.0f}s..."
)
await asyncio.sleep(delay)
else:
logging.error(
f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} "
f"attempts. File size: {file_size_mb:.1f}MB, "
f"Pages: {estimated_pages}. "
f"Errors: {'; '.join(attempt_errors)}"
)
except Exception:
raise
raise last_exception or RuntimeError(
f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. "
f"File size: {file_size_mb:.1f}MB"
)
# ---------------------------------------------------------------------------
# Per-service parse functions
# ---------------------------------------------------------------------------
async def parse_with_unstructured(file_path: str):
"""
Parse a file using the Unstructured ETL service.
Returns:
List of LangChain Document elements.
"""
from langchain_unstructured import UnstructuredLoader
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
return await loader.aload()
async def parse_with_docling(file_path: str, filename: str) -> str:
"""
Parse a file using the Docling ETL service (via the Docling service wrapper).
Returns:
Markdown content string.
"""
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
pdfminer_logger = getLogger("pdfminer")
original_level = pdfminer_logger.level
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer")
warnings.filterwarnings(
"ignore", message=".*Cannot set gray non-stroke color.*"
)
warnings.filterwarnings("ignore", message=".*invalid float value.*")
pdfminer_logger.setLevel(ERROR)
try:
result = await docling_service.process_document(file_path, filename)
finally:
pdfminer_logger.setLevel(original_level)
return result["content"]

View file

@ -11,13 +11,6 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentStatus, DocumentType
from app.utils.document_converters import generate_unique_identifier_hash
from ._constants import (
BASE_JOB_TIMEOUT,
MAX_UPLOAD_TIMEOUT,
MIN_UPLOAD_TIMEOUT,
PER_PAGE_JOB_TIMEOUT,
UPLOAD_BYTES_PER_SECOND_SLOW,
)
from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
@ -198,21 +191,3 @@ async def update_document_from_connector(
if "connector_id" in connector:
document.connector_id = connector["connector_id"]
await session.commit()
# ---------------------------------------------------------------------------
# Timeout calculations
# ---------------------------------------------------------------------------
def calculate_upload_timeout(file_size_bytes: int) -> float:
"""Calculate upload timeout based on file size (conservative for slow connections)."""
estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5
return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT))
def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float:
"""Calculate job processing timeout based on page count and file size."""
page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT)
size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60
return max(page_based_timeout, size_based_timeout)

View file

@ -1,14 +1,9 @@
"""
Unified document save/update logic for file processors.
Replaces the three nearly-identical ``add_received_file_document_using_*``
functions with a single ``save_file_document`` function plus thin wrappers
for backward compatibility.
"""
import logging
from langchain_core.documents import Document as LangChainDocument
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
@ -207,79 +202,3 @@ async def save_file_document(
raise RuntimeError(
f"Failed to process file document using {etl_service}: {e!s}"
) from e
# ---------------------------------------------------------------------------
# Backward-compatible wrapper functions
# ---------------------------------------------------------------------------
async def add_received_file_document_using_unstructured(
session: AsyncSession,
file_name: str,
unstructured_processed_elements: list[LangChainDocument],
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""Process and store a file document using the Unstructured service."""
from app.utils.document_converters import convert_document_to_markdown
markdown_content = await convert_document_to_markdown(
unstructured_processed_elements
)
return await save_file_document(
session,
file_name,
markdown_content,
search_space_id,
user_id,
"UNSTRUCTURED",
connector,
enable_summary,
)
async def add_received_file_document_using_llamacloud(
session: AsyncSession,
file_name: str,
llamacloud_markdown_document: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""Process and store document content parsed by LlamaCloud."""
return await save_file_document(
session,
file_name,
llamacloud_markdown_document,
search_space_id,
user_id,
"LLAMACLOUD",
connector,
enable_summary,
)
async def add_received_file_document_using_docling(
session: AsyncSession,
file_name: str,
docling_markdown_document: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""Process and store document content parsed by Docling."""
return await save_file_document(
session,
file_name,
docling_markdown_document,
search_space_id,
user_id,
"DOCLING",
connector,
enable_summary,
)

View file

@ -1,14 +1,8 @@
"""
File document processors orchestrating content extraction and indexing.
This module is the public entry point for file processing. It delegates to
specialised sub-modules that each own a single concern:
- ``_constants`` file type classification and configuration constants
- ``_helpers`` document deduplication, migration, connector helpers
- ``_direct_converters`` lossless file-to-markdown for csv/tsv/html
- ``_etl`` ETL parsing strategies (Unstructured, LlamaCloud, Docling)
- ``_save`` unified document creation / update logic
Delegates content extraction to ``app.etl_pipeline.EtlPipelineService`` and
keeps only orchestration concerns (notifications, logging, page limits, saving).
"""
from __future__ import annotations
@ -17,38 +11,19 @@ import contextlib
import logging
import os
from dataclasses import dataclass, field
from logging import ERROR, getLogger
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config as app_config
from app.db import Document, Log, Notification
from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
from ._constants import FileCategory, classify_file
from ._direct_converters import convert_file_directly
from ._etl import (
parse_with_docling,
parse_with_llamacloud_retry,
parse_with_unstructured,
)
from ._helpers import update_document_from_connector
from ._save import (
add_received_file_document_using_docling,
add_received_file_document_using_llamacloud,
add_received_file_document_using_unstructured,
save_file_document,
)
from ._save import save_file_document
from .markdown_processor import add_received_markdown_file_document
# Re-export public API so existing ``from file_processors import …`` keeps working.
__all__ = [
"add_received_file_document_using_docling",
"add_received_file_document_using_llamacloud",
"add_received_file_document_using_unstructured",
"parse_with_llamacloud_retry",
"process_file_in_background",
"process_file_in_background_with_document",
"save_file_document",
@ -142,35 +117,31 @@ async def _log_page_divergence(
# ===================================================================
async def _process_markdown_upload(ctx: _ProcessingContext) -> Document | None:
"""Read a markdown / text file and create or update a document."""
await _notify(ctx, "parsing", "Reading file")
async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | None:
"""Extract content from a non-document file (plaintext/direct_convert/audio) via the unified ETL pipeline."""
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
await _notify(ctx, "parsing", "Processing file")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Processing markdown/text file: {ctx.filename}",
{"file_type": "markdown", "processing_stage": "reading_file"},
f"Processing file: {ctx.filename}",
{"processing_stage": "extracting"},
)
with open(ctx.file_path, encoding="utf-8") as f:
markdown_content = f.read()
etl_result = await EtlPipelineService().extract(
EtlRequest(file_path=ctx.file_path, filename=ctx.filename)
)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
await _notify(ctx, "chunking")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Creating document from markdown content: {ctx.filename}",
{
"processing_stage": "creating_document",
"content_length": len(markdown_content),
},
)
result = await add_received_markdown_file_document(
ctx.session,
ctx.filename,
markdown_content,
etl_result.markdown_content,
ctx.search_space_id,
ctx.user_id,
ctx.connector,
@ -181,179 +152,19 @@ async def _process_markdown_upload(ctx: _ProcessingContext) -> Document | None:
if result:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully processed markdown file: {ctx.filename}",
f"Successfully processed file: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "markdown",
"file_type": etl_result.content_type,
"etl_service": etl_result.etl_service,
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Markdown file already exists (duplicate): {ctx.filename}",
{"duplicate_detected": True, "file_type": "markdown"},
)
return result
async def _process_direct_convert_upload(ctx: _ProcessingContext) -> Document | None:
"""Convert a text-based file (csv/tsv/html) to markdown without ETL."""
await _notify(ctx, "parsing", "Converting file")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Direct-converting file to markdown: {ctx.filename}",
{"file_type": "direct_convert", "processing_stage": "converting"},
)
markdown_content = convert_file_directly(ctx.file_path, ctx.filename)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
await _notify(ctx, "chunking")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Creating document from converted content: {ctx.filename}",
{
"processing_stage": "creating_document",
"content_length": len(markdown_content),
},
)
result = await add_received_markdown_file_document(
ctx.session,
ctx.filename,
markdown_content,
ctx.search_space_id,
ctx.user_id,
ctx.connector,
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
if result:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully direct-converted file: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "direct_convert",
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Direct-converted file already exists (duplicate): {ctx.filename}",
{"duplicate_detected": True, "file_type": "direct_convert"},
)
return result
async def _process_audio_upload(ctx: _ProcessingContext) -> Document | None:
"""Transcribe an audio file and create or update a document."""
await _notify(ctx, "parsing", "Transcribing audio")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Processing audio file for transcription: {ctx.filename}",
{"file_type": "audio", "processing_stage": "starting_transcription"},
)
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
try:
stt_result = stt_service.transcribe_file(ctx.file_path)
transcribed_text = stt_result.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
transcribed_text = (
f"# Transcription of {ctx.filename}\n\n{transcribed_text}"
)
except Exception as e:
raise HTTPException(
status_code=422,
detail=f"Failed to transcribe audio file {ctx.filename}: {e!s}",
) from e
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Local STT transcription completed: {ctx.filename}",
{
"processing_stage": "local_transcription_complete",
"language": stt_result.get("language"),
"confidence": stt_result.get("language_probability"),
"duration": stt_result.get("duration"),
},
)
else:
from litellm import atranscription
with open(ctx.file_path, "rb") as audio_file:
transcription_kwargs: dict = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
transcription_kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
transcription_response = await atranscription(**transcription_kwargs)
transcribed_text = transcription_response.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
transcribed_text = f"# Transcription of {ctx.filename}\n\n{transcribed_text}"
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Transcription completed, creating document: {ctx.filename}",
{
"processing_stage": "transcription_complete",
"transcript_length": len(transcribed_text),
},
)
await _notify(ctx, "chunking")
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
result = await add_received_markdown_file_document(
ctx.session,
ctx.filename,
transcribed_text,
ctx.search_space_id,
ctx.user_id,
ctx.connector,
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
if result:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully transcribed and processed audio file: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "audio",
"transcript_length": len(transcribed_text),
"stt_service": stt_service_type,
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Audio file transcript already exists (duplicate): {ctx.filename}",
{"duplicate_detected": True, "file_type": "audio"},
f"File already exists (duplicate): {ctx.filename}",
{"duplicate_detected": True, "file_type": etl_result.content_type},
)
return result
@ -363,279 +174,10 @@ async def _process_audio_upload(ctx: _ProcessingContext) -> Document | None:
# ---------------------------------------------------------------------------
async def _etl_unstructured(
ctx: _ProcessingContext,
page_limit_service,
estimated_pages: int,
) -> Document | None:
"""Parse and save via the Unstructured ETL service."""
await _notify(ctx, "parsing", "Extracting content")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Processing file with Unstructured ETL: {ctx.filename}",
{
"file_type": "document",
"etl_service": "UNSTRUCTURED",
"processing_stage": "loading",
},
)
docs = await parse_with_unstructured(ctx.file_path)
await _notify(ctx, "chunking", chunks_count=len(docs))
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Unstructured ETL completed, creating document: {ctx.filename}",
{"processing_stage": "etl_complete", "elements_count": len(docs)},
)
actual_pages = page_limit_service.estimate_pages_from_elements(docs)
final_pages = max(estimated_pages, actual_pages)
await _log_page_divergence(
ctx.task_logger,
ctx.log_entry,
ctx.filename,
estimated_pages,
actual_pages,
final_pages,
)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
result = await add_received_file_document_using_unstructured(
ctx.session,
ctx.filename,
docs,
ctx.search_space_id,
ctx.user_id,
ctx.connector,
enable_summary=ctx.enable_summary,
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
if result:
await page_limit_service.update_page_usage(
ctx.user_id, final_pages, allow_exceed=True
)
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully processed file with Unstructured: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "document",
"etl_service": "UNSTRUCTURED",
"pages_processed": final_pages,
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Document already exists (duplicate): {ctx.filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "UNSTRUCTURED",
},
)
return result
async def _etl_llamacloud(
ctx: _ProcessingContext,
page_limit_service,
estimated_pages: int,
) -> Document | None:
"""Parse and save via the LlamaCloud ETL service."""
await _notify(ctx, "parsing", "Extracting content")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Processing file with LlamaCloud ETL: {ctx.filename}",
{
"file_type": "document",
"etl_service": "LLAMACLOUD",
"processing_stage": "parsing",
"estimated_pages": estimated_pages,
},
)
raw_result = await parse_with_llamacloud_retry(
file_path=ctx.file_path,
estimated_pages=estimated_pages,
task_logger=ctx.task_logger,
log_entry=ctx.log_entry,
)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
markdown_documents = await raw_result.aget_markdown_documents(split_by_page=False)
await _notify(ctx, "chunking", chunks_count=len(markdown_documents))
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"LlamaCloud parsing completed, creating documents: {ctx.filename}",
{
"processing_stage": "parsing_complete",
"documents_count": len(markdown_documents),
},
)
if not markdown_documents:
await ctx.task_logger.log_task_failure(
ctx.log_entry,
f"LlamaCloud parsing returned no documents: {ctx.filename}",
"ETL service returned empty document list",
{"error_type": "EmptyDocumentList", "etl_service": "LLAMACLOUD"},
)
raise ValueError(f"LlamaCloud parsing returned no documents for {ctx.filename}")
actual_pages = page_limit_service.estimate_pages_from_markdown(markdown_documents)
final_pages = max(estimated_pages, actual_pages)
await _log_page_divergence(
ctx.task_logger,
ctx.log_entry,
ctx.filename,
estimated_pages,
actual_pages,
final_pages,
)
any_created = False
last_doc: Document | None = None
for doc in markdown_documents:
doc_result = await add_received_file_document_using_llamacloud(
ctx.session,
ctx.filename,
llamacloud_markdown_document=doc.text,
search_space_id=ctx.search_space_id,
user_id=ctx.user_id,
connector=ctx.connector,
enable_summary=ctx.enable_summary,
)
if doc_result:
any_created = True
last_doc = doc_result
if any_created:
await page_limit_service.update_page_usage(
ctx.user_id, final_pages, allow_exceed=True
)
if ctx.connector:
await update_document_from_connector(last_doc, ctx.connector, ctx.session)
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully processed file with LlamaCloud: {ctx.filename}",
{
"document_id": last_doc.id,
"content_hash": last_doc.content_hash,
"file_type": "document",
"etl_service": "LLAMACLOUD",
"pages_processed": final_pages,
"documents_count": len(markdown_documents),
},
)
return last_doc
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Document already exists (duplicate): {ctx.filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "LLAMACLOUD",
"documents_count": len(markdown_documents),
},
)
return None
async def _etl_docling(
ctx: _ProcessingContext,
page_limit_service,
estimated_pages: int,
) -> Document | None:
"""Parse and save via the Docling ETL service."""
await _notify(ctx, "parsing", "Extracting content")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Processing file with Docling ETL: {ctx.filename}",
{
"file_type": "document",
"etl_service": "DOCLING",
"processing_stage": "parsing",
},
)
content = await parse_with_docling(ctx.file_path, ctx.filename)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Docling parsing completed, creating document: {ctx.filename}",
{"processing_stage": "parsing_complete", "content_length": len(content)},
)
actual_pages = page_limit_service.estimate_pages_from_content_length(len(content))
final_pages = max(estimated_pages, actual_pages)
await _log_page_divergence(
ctx.task_logger,
ctx.log_entry,
ctx.filename,
estimated_pages,
actual_pages,
final_pages,
)
await _notify(ctx, "chunking")
result = await add_received_file_document_using_docling(
ctx.session,
ctx.filename,
docling_markdown_document=content,
search_space_id=ctx.search_space_id,
user_id=ctx.user_id,
connector=ctx.connector,
enable_summary=ctx.enable_summary,
)
if result:
await page_limit_service.update_page_usage(
ctx.user_id, final_pages, allow_exceed=True
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully processed file with Docling: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "document",
"etl_service": "DOCLING",
"pages_processed": final_pages,
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Document already exists (duplicate): {ctx.filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "DOCLING",
},
)
return result
async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
"""Route a document file to the configured ETL service."""
"""Route a document file to the configured ETL service via the unified pipeline."""
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.services.page_limit_service import PageLimitExceededError, PageLimitService
page_limit_service = PageLimitService(ctx.session)
@ -665,16 +207,60 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
os.unlink(ctx.file_path)
raise HTTPException(status_code=403, detail=str(e)) from e
etl_dispatch = {
"UNSTRUCTURED": _etl_unstructured,
"LLAMACLOUD": _etl_llamacloud,
"DOCLING": _etl_docling,
}
handler = etl_dispatch.get(app_config.ETL_SERVICE)
if handler is None:
raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}")
await _notify(ctx, "parsing", "Extracting content")
return await handler(ctx, page_limit_service, estimated_pages)
etl_result = await EtlPipelineService().extract(
EtlRequest(
file_path=ctx.file_path,
filename=ctx.filename,
estimated_pages=estimated_pages,
)
)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
await _notify(ctx, "chunking")
result = await save_file_document(
ctx.session,
ctx.filename,
etl_result.markdown_content,
ctx.search_space_id,
ctx.user_id,
etl_result.etl_service,
ctx.connector,
enable_summary=ctx.enable_summary,
)
if result:
await page_limit_service.update_page_usage(
ctx.user_id, estimated_pages, allow_exceed=True
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully processed file: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "document",
"etl_service": etl_result.etl_service,
"pages_processed": estimated_pages,
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Document already exists (duplicate): {ctx.filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": etl_result.etl_service,
},
)
return result
# ===================================================================
@ -706,15 +292,16 @@ async def process_file_in_background(
)
try:
category = classify_file(filename)
from app.etl_pipeline.file_classifier import (
FileCategory as EtlFileCategory,
classify_file as etl_classify,
)
if category == FileCategory.MARKDOWN:
return await _process_markdown_upload(ctx)
if category == FileCategory.DIRECT_CONVERT:
return await _process_direct_convert_upload(ctx)
if category == FileCategory.AUDIO:
return await _process_audio_upload(ctx)
return await _process_document_upload(ctx)
category = etl_classify(filename)
if category == EtlFileCategory.DOCUMENT:
return await _process_document_upload(ctx)
return await _process_non_document_upload(ctx)
except Exception as e:
await session.rollback()
@ -758,201 +345,64 @@ async def _extract_file_content(
Returns:
Tuple of (markdown_content, etl_service_name).
"""
category = classify_file(filename)
if category == FileCategory.MARKDOWN:
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Reading file",
)
await task_logger.log_task_progress(
log_entry,
f"Processing markdown/text file: {filename}",
{"file_type": "markdown", "processing_stage": "reading_file"},
)
with open(file_path, encoding="utf-8") as f:
content = f.read()
with contextlib.suppress(Exception):
os.unlink(file_path)
return content, "MARKDOWN"
if category == FileCategory.DIRECT_CONVERT:
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Converting file",
)
await task_logger.log_task_progress(
log_entry,
f"Direct-converting file to markdown: {filename}",
{"file_type": "direct_convert", "processing_stage": "converting"},
)
content = convert_file_directly(file_path, filename)
with contextlib.suppress(Exception):
os.unlink(file_path)
return content, "DIRECT_CONVERT"
if category == FileCategory.AUDIO:
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Transcribing audio",
)
await task_logger.log_task_progress(
log_entry,
f"Processing audio file for transcription: {filename}",
{"file_type": "audio", "processing_stage": "starting_transcription"},
)
transcribed_text = await _transcribe_audio(file_path, filename)
with contextlib.suppress(Exception):
os.unlink(file_path)
return transcribed_text, "AUDIO_TRANSCRIPTION"
# Document file — use ETL service
return await _extract_document_content(
file_path,
filename,
session,
user_id,
task_logger,
log_entry,
notification,
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.etl_pipeline.file_classifier import (
FileCategory,
classify_file as etl_classify,
)
async def _transcribe_audio(file_path: str, filename: str) -> str:
"""Transcribe an audio file and return formatted markdown text."""
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
result = stt_service.transcribe_file(file_path)
text = result.get("text", "")
if not text:
raise ValueError("Transcription returned empty text")
else:
from litellm import atranscription
with open(file_path, "rb") as audio_file:
kwargs: dict = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
response = await atranscription(**kwargs)
text = response.get("text", "")
if not text:
raise ValueError("Transcription returned empty text")
return f"# Transcription of {filename}\n\n{text}"
async def _extract_document_content(
file_path: str,
filename: str,
session: AsyncSession,
user_id: str,
task_logger: TaskLoggingService,
log_entry: Log,
notification: Notification | None,
) -> tuple[str, str]:
"""
Parse a document file via the configured ETL service.
Returns:
Tuple of (markdown_content, etl_service_name).
"""
from app.services.page_limit_service import PageLimitService
page_limit_service = PageLimitService(session)
try:
estimated_pages = page_limit_service.estimate_pages_before_processing(file_path)
except Exception:
file_size = os.path.getsize(file_path)
estimated_pages = max(1, file_size // (80 * 1024))
await page_limit_service.check_page_limit(user_id, estimated_pages)
etl_service = app_config.ETL_SERVICE
markdown_content: str | None = None
category = etl_classify(filename)
estimated_pages = 0
if notification:
stage_messages = {
FileCategory.PLAINTEXT: "Reading file",
FileCategory.DIRECT_CONVERT: "Converting file",
FileCategory.AUDIO: "Transcribing audio",
FileCategory.UNSUPPORTED: "Unsupported file type",
FileCategory.DOCUMENT: "Extracting content",
}
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Extracting content",
stage_message=stage_messages.get(category, "Processing"),
)
if etl_service == "UNSTRUCTURED":
from app.utils.document_converters import convert_document_to_markdown
await task_logger.log_task_progress(
log_entry,
f"Processing {category.value} file: {filename}",
{"file_type": category.value, "processing_stage": "extracting"},
)
docs = await parse_with_unstructured(file_path)
markdown_content = await convert_document_to_markdown(docs)
actual_pages = page_limit_service.estimate_pages_from_elements(docs)
final_pages = max(estimated_pages, actual_pages)
await page_limit_service.update_page_usage(
user_id, final_pages, allow_exceed=True
)
if category == FileCategory.DOCUMENT:
from app.services.page_limit_service import PageLimitService
elif etl_service == "LLAMACLOUD":
raw_result = await parse_with_llamacloud_retry(
page_limit_service = PageLimitService(session)
estimated_pages = _estimate_pages_safe(page_limit_service, file_path)
await page_limit_service.check_page_limit(user_id, estimated_pages)
result = await EtlPipelineService().extract(
EtlRequest(
file_path=file_path,
filename=filename,
estimated_pages=estimated_pages,
task_logger=task_logger,
log_entry=log_entry,
)
markdown_documents = await raw_result.aget_markdown_documents(
split_by_page=False
)
if not markdown_documents:
raise RuntimeError(f"LlamaCloud parsing returned no documents: {filename}")
markdown_content = markdown_documents[0].text
)
if category == FileCategory.DOCUMENT:
await page_limit_service.update_page_usage(
user_id, estimated_pages, allow_exceed=True
)
elif etl_service == "DOCLING":
getLogger("docling.pipeline.base_pipeline").setLevel(ERROR)
getLogger("docling.document_converter").setLevel(ERROR)
getLogger("docling_core.transforms.chunker.hierarchical_chunker").setLevel(
ERROR
)
from docling.document_converter import DocumentConverter
converter = DocumentConverter()
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
await page_limit_service.update_page_usage(
user_id, estimated_pages, allow_exceed=True
)
else:
raise RuntimeError(f"Unknown ETL_SERVICE: {etl_service}")
with contextlib.suppress(Exception):
os.unlink(file_path)
if not markdown_content:
if not result.markdown_content:
raise RuntimeError(f"Failed to extract content from file: {filename}")
return markdown_content, etl_service
return result.markdown_content, result.etl_service
async def process_file_in_background_with_document(

View file

@ -0,0 +1,124 @@
"""Per-parser document extension sets for the ETL pipeline.
Every consumer (file_classifier, connector-level skip checks, ETL pipeline
validation) imports from here so there is a single source of truth.
Extensions already covered by PLAINTEXT_EXTENSIONS, AUDIO_EXTENSIONS, or
DIRECT_CONVERT_EXTENSIONS in file_classifier are NOT repeated here -- these
sets are exclusively for the "document" ETL path (Docling / LlamaParse /
Unstructured).
"""
from pathlib import PurePosixPath
# ---------------------------------------------------------------------------
# Per-parser document extension sets (from official documentation)
# ---------------------------------------------------------------------------
DOCLING_DOCUMENT_EXTENSIONS: frozenset[str] = frozenset(
{
".pdf",
".docx",
".xlsx",
".pptx",
".png",
".jpg",
".jpeg",
".tiff",
".tif",
".bmp",
".webp",
}
)
LLAMAPARSE_DOCUMENT_EXTENSIONS: frozenset[str] = frozenset(
{
".pdf",
".docx",
".doc",
".xlsx",
".xls",
".pptx",
".ppt",
".docm",
".dot",
".dotm",
".pptm",
".pot",
".potx",
".xlsm",
".xlsb",
".xlw",
".rtf",
".epub",
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".tiff",
".tif",
".webp",
".svg",
".odt",
".ods",
".odp",
".hwp",
".hwpx",
}
)
UNSTRUCTURED_DOCUMENT_EXTENSIONS: frozenset[str] = frozenset(
{
".pdf",
".docx",
".doc",
".xlsx",
".xls",
".pptx",
".ppt",
".png",
".jpg",
".jpeg",
".bmp",
".tiff",
".tif",
".heic",
".rtf",
".epub",
".odt",
".eml",
".msg",
".p7s",
}
)
# ---------------------------------------------------------------------------
# Union (used by classify_file for routing) + service lookup
# ---------------------------------------------------------------------------
DOCUMENT_EXTENSIONS: frozenset[str] = (
DOCLING_DOCUMENT_EXTENSIONS
| LLAMAPARSE_DOCUMENT_EXTENSIONS
| UNSTRUCTURED_DOCUMENT_EXTENSIONS
)
_SERVICE_MAP: dict[str, frozenset[str]] = {
"DOCLING": DOCLING_DOCUMENT_EXTENSIONS,
"LLAMACLOUD": LLAMAPARSE_DOCUMENT_EXTENSIONS,
"UNSTRUCTURED": UNSTRUCTURED_DOCUMENT_EXTENSIONS,
}
def get_document_extensions_for_service(etl_service: str | None) -> frozenset[str]:
"""Return the document extensions supported by *etl_service*.
Falls back to the full union when the service is ``None`` or unknown.
"""
return _SERVICE_MAP.get(etl_service or "", DOCUMENT_EXTENSIONS)
def is_supported_document_extension(filename: str) -> bool:
"""Return True if the file's extension is in the supported document set."""
suffix = PurePosixPath(filename).suffix.lower()
return suffix in DOCUMENT_EXTENSIONS