refactor: streamline document upload limits and enhance handling of mentioned documents

- Updated maximum file size limit to 500 MB per file.
- Removed restrictions on the number of files per upload and total upload size.
- Enhanced handling of user-mentioning documents in the knowledge base search middleware.
- Improved document reading and processing logic to accommodate new features and optimizations.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-04-02 19:39:10 -07:00
parent 6727266107
commit 62e698d8aa
33 changed files with 2889 additions and 2443 deletions

View file

@ -12,16 +12,14 @@ Available processors:
- YouTube processor: Process YouTube videos and extract transcripts
"""
# URL crawler
# Extension processor
from .extension_processor import add_extension_received_document
# File processors
from .file_processors import (
# File processors (backward-compatible re-exports from _save)
from ._save import (
add_received_file_document_using_docling,
add_received_file_document_using_llamacloud,
add_received_file_document_using_unstructured,
)
from .extension_processor import add_extension_received_document
# Markdown processor
from .markdown_processor import add_received_markdown_file_document
@ -32,9 +30,9 @@ from .youtube_processor import add_youtube_video_document
__all__ = [
# Extension processing
"add_extension_received_document",
# File processing with different ETL services
"add_received_file_document_using_docling",
"add_received_file_document_using_llamacloud",
# File processing with different ETL services
"add_received_file_document_using_unstructured",
# Markdown file processing
"add_received_markdown_file_document",

View file

@ -0,0 +1,74 @@
"""
Constants for file document processing.
Centralizes file type classification, LlamaCloud retry configuration,
and timeout calculation parameters.
"""
import ssl
from enum import Enum
import httpx
# ---------------------------------------------------------------------------
# File type classification
# ---------------------------------------------------------------------------
MARKDOWN_EXTENSIONS = (".md", ".markdown", ".txt")
AUDIO_EXTENSIONS = (".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")
DIRECT_CONVERT_EXTENSIONS = (".csv", ".tsv", ".html", ".htm")
class FileCategory(Enum):
MARKDOWN = "markdown"
AUDIO = "audio"
DIRECT_CONVERT = "direct_convert"
DOCUMENT = "document"
def classify_file(filename: str) -> FileCategory:
"""Classify a file by its extension into a processing category."""
lower = filename.lower()
if lower.endswith(MARKDOWN_EXTENSIONS):
return FileCategory.MARKDOWN
if lower.endswith(AUDIO_EXTENSIONS):
return FileCategory.AUDIO
if lower.endswith(DIRECT_CONVERT_EXTENSIONS):
return FileCategory.DIRECT_CONVERT
return FileCategory.DOCUMENT
# ---------------------------------------------------------------------------
# LlamaCloud retry configuration
# ---------------------------------------------------------------------------
LLAMACLOUD_MAX_RETRIES = 5
LLAMACLOUD_BASE_DELAY = 10 # seconds (exponential backoff base)
LLAMACLOUD_MAX_DELAY = 120 # max delay between retries (2 minutes)
LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
ssl.SSLError,
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadError,
httpx.ReadTimeout,
httpx.WriteError,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
httpx.LocalProtocolError,
ConnectionError,
ConnectionResetError,
TimeoutError,
OSError,
)
# ---------------------------------------------------------------------------
# Timeout calculation constants
# ---------------------------------------------------------------------------
UPLOAD_BYTES_PER_SECOND_SLOW = (
100 * 1024
) # 100 KB/s (conservative for slow connections)
MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file
MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files
BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing
PER_PAGE_JOB_TIMEOUT = 60 # 1 minute per page for processing

View file

@ -0,0 +1,90 @@
"""
Lossless file-to-markdown converters for text-based formats.
These converters handle file types that can be faithfully represented as
markdown without any external ETL/OCR service:
- CSV / TSV markdown table (stdlib ``csv``)
- HTML / HTM markdown (``markdownify``)
"""
from __future__ import annotations
import csv
from collections.abc import Callable
from pathlib import Path
from markdownify import markdownify
# The stdlib csv module defaults to a 128 KB field-size limit which is too
# small for real-world exports (e.g. chat logs, CRM dumps). We raise it once
# at import time so every csv.reader call in this module can handle large fields.
csv.field_size_limit(2**31 - 1)
def _escape_pipe(cell: str) -> str:
"""Escape literal pipe characters inside a markdown table cell."""
return cell.replace("|", "\\|")
def csv_to_markdown(file_path: str, *, delimiter: str = ",") -> str:
"""Convert a CSV (or TSV) file to a markdown table.
The first row is treated as the header. An empty file returns an
empty string so the caller can decide how to handle it.
"""
with open(file_path, encoding="utf-8", newline="") as fh:
reader = csv.reader(fh, delimiter=delimiter)
rows = list(reader)
if not rows:
return ""
header, *body = rows
col_count = len(header)
lines: list[str] = []
header_cells = [_escape_pipe(c.strip()) for c in header]
lines.append("| " + " | ".join(header_cells) + " |")
lines.append("| " + " | ".join(["---"] * col_count) + " |")
for row in body:
padded = row + [""] * (col_count - len(row))
cells = [_escape_pipe(c.strip()) for c in padded[:col_count]]
lines.append("| " + " | ".join(cells) + " |")
return "\n".join(lines) + "\n"
def tsv_to_markdown(file_path: str) -> str:
"""Convert a TSV file to a markdown table."""
return csv_to_markdown(file_path, delimiter="\t")
def html_to_markdown(file_path: str) -> str:
"""Convert an HTML file to markdown via ``markdownify``."""
html = Path(file_path).read_text(encoding="utf-8")
return markdownify(html).strip()
_CONVERTER_MAP: dict[str, Callable[..., str]] = {
".csv": csv_to_markdown,
".tsv": tsv_to_markdown,
".html": html_to_markdown,
".htm": html_to_markdown,
}
def convert_file_directly(file_path: str, filename: str) -> str:
"""Dispatch to the appropriate lossless converter based on file extension.
Raises ``ValueError`` if the extension is not supported.
"""
suffix = Path(filename).suffix.lower()
converter = _CONVERTER_MAP.get(suffix)
if converter is None:
raise ValueError(
f"No direct converter for extension '{suffix}' (file: {filename})"
)
return converter(file_path)

View file

@ -0,0 +1,209 @@
"""
ETL parsing strategies for different document processing services.
Provides parse functions for Unstructured, LlamaCloud, and Docling, along with
LlamaCloud retry logic and dynamic timeout calculations.
"""
import asyncio
import logging
import os
import random
import warnings
from logging import ERROR, getLogger
import httpx
from app.config import config as app_config
from app.db import Log
from app.services.task_logging_service import TaskLoggingService
from ._constants import (
LLAMACLOUD_BASE_DELAY,
LLAMACLOUD_MAX_DELAY,
LLAMACLOUD_MAX_RETRIES,
LLAMACLOUD_RETRYABLE_EXCEPTIONS,
PER_PAGE_JOB_TIMEOUT,
)
from ._helpers import calculate_job_timeout, calculate_upload_timeout
# ---------------------------------------------------------------------------
# LlamaCloud parsing with retry
# ---------------------------------------------------------------------------
async def parse_with_llamacloud_retry(
file_path: str,
estimated_pages: int,
task_logger: TaskLoggingService | None = None,
log_entry: Log | None = None,
):
"""
Parse a file with LlamaCloud with retry logic for transient SSL/connection errors.
Uses dynamic timeout calculations based on file size and page count to handle
very large files reliably.
Returns:
LlamaParse result object
Raises:
Exception: If all retries fail
"""
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
file_size_bytes = os.path.getsize(file_path)
file_size_mb = file_size_bytes / (1024 * 1024)
upload_timeout = calculate_upload_timeout(file_size_bytes)
job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes)
custom_timeout = httpx.Timeout(
connect=120.0,
read=upload_timeout,
write=upload_timeout,
pool=120.0,
)
logging.info(
f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, "
f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, "
f"job_timeout={job_timeout:.0f}s"
)
last_exception = None
attempt_errors: list[str] = []
for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1):
try:
async with httpx.AsyncClient(timeout=custom_timeout) as custom_client:
parser = LlamaParse(
api_key=app_config.LLAMA_CLOUD_API_KEY,
num_workers=1,
verbose=True,
language="en",
result_type=ResultType.MD,
max_timeout=int(max(2000, job_timeout + upload_timeout)),
job_timeout_in_seconds=job_timeout,
job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT,
custom_client=custom_client,
)
result = await parser.aparse(file_path)
if attempt > 1:
logging.info(
f"LlamaCloud upload succeeded on attempt {attempt} after "
f"{len(attempt_errors)} failures"
)
return result
except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e:
last_exception = e
error_type = type(e).__name__
error_msg = str(e)[:200]
attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}")
if attempt < LLAMACLOUD_MAX_RETRIES:
base_delay = min(
LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)),
LLAMACLOUD_MAX_DELAY,
)
jitter = base_delay * 0.25 * (2 * random.random() - 1)
delay = base_delay + jitter
if task_logger and log_entry:
await task_logger.log_task_progress(
log_entry,
f"LlamaCloud upload failed "
f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), "
f"retrying in {delay:.0f}s",
{
"error_type": error_type,
"error_message": error_msg,
"attempt": attempt,
"retry_delay": delay,
"file_size_mb": round(file_size_mb, 1),
"upload_timeout": upload_timeout,
},
)
else:
logging.warning(
f"LlamaCloud upload failed "
f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): "
f"{error_type}. File: {file_size_mb:.1f}MB. "
f"Retrying in {delay:.0f}s..."
)
await asyncio.sleep(delay)
else:
logging.error(
f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} "
f"attempts. File size: {file_size_mb:.1f}MB, "
f"Pages: {estimated_pages}. "
f"Errors: {'; '.join(attempt_errors)}"
)
except Exception:
raise
raise last_exception or RuntimeError(
f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. "
f"File size: {file_size_mb:.1f}MB"
)
# ---------------------------------------------------------------------------
# Per-service parse functions
# ---------------------------------------------------------------------------
async def parse_with_unstructured(file_path: str):
"""
Parse a file using the Unstructured ETL service.
Returns:
List of LangChain Document elements.
"""
from langchain_unstructured import UnstructuredLoader
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
return await loader.aload()
async def parse_with_docling(file_path: str, filename: str) -> str:
"""
Parse a file using the Docling ETL service (via the Docling service wrapper).
Returns:
Markdown content string.
"""
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
pdfminer_logger = getLogger("pdfminer")
original_level = pdfminer_logger.level
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer")
warnings.filterwarnings(
"ignore", message=".*Cannot set gray non-stroke color.*"
)
warnings.filterwarnings("ignore", message=".*invalid float value.*")
pdfminer_logger.setLevel(ERROR)
try:
result = await docling_service.process_document(file_path, filename)
finally:
pdfminer_logger.setLevel(original_level)
return result["content"]

View file

@ -0,0 +1,218 @@
"""
Document helper functions for deduplication, migration, and connector updates.
Provides reusable logic shared across file processors and ETL strategies.
"""
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentStatus, DocumentType
from app.utils.document_converters import generate_unique_identifier_hash
from ._constants import (
BASE_JOB_TIMEOUT,
MAX_UPLOAD_TIMEOUT,
MIN_UPLOAD_TIMEOUT,
PER_PAGE_JOB_TIMEOUT,
UPLOAD_BYTES_PER_SECOND_SLOW,
)
from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
)
# ---------------------------------------------------------------------------
# Unique identifier helpers
# ---------------------------------------------------------------------------
def get_google_drive_unique_identifier(
connector: dict | None,
filename: str,
search_space_id: int,
) -> tuple[str, str | None]:
"""
Get unique identifier hash, using file_id for Google Drive (stable across renames).
Returns:
Tuple of (primary_hash, legacy_hash or None).
For Google Drive: (file_id-based hash, filename-based hash for migration).
For other sources: (filename-based hash, None).
"""
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
metadata = connector.get("metadata", {})
file_id = metadata.get("google_drive_file_id")
if file_id:
primary_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
legacy_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, filename, search_space_id
)
return primary_hash, legacy_hash
primary_hash = generate_unique_identifier_hash(
DocumentType.FILE, filename, search_space_id
)
return primary_hash, None
# ---------------------------------------------------------------------------
# Document deduplication and migration
# ---------------------------------------------------------------------------
async def handle_existing_document_update(
session: AsyncSession,
existing_document: Document,
content_hash: str,
connector: dict | None,
filename: str,
primary_hash: str,
) -> tuple[bool, Document | None]:
"""
Handle update logic for an existing document.
Returns:
Tuple of (should_skip_processing, document_to_return):
- (True, document): Content unchanged, return existing document
- (False, None): Content changed, needs re-processing
"""
if existing_document.unique_identifier_hash != primary_hash:
existing_document.unique_identifier_hash = primary_hash
logging.info(f"Migrated document to file_id-based identifier: {filename}")
if existing_document.content_hash == content_hash:
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
connector_metadata = connector.get("metadata", {})
new_name = connector_metadata.get("google_drive_file_name")
doc_metadata = existing_document.document_metadata or {}
old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get(
"google_drive_file_name"
)
if new_name and old_name and old_name != new_name:
from sqlalchemy.orm.attributes import flag_modified
existing_document.title = new_name
if not existing_document.document_metadata:
existing_document.document_metadata = {}
existing_document.document_metadata["FILE_NAME"] = new_name
existing_document.document_metadata["google_drive_file_name"] = new_name
flag_modified(existing_document, "document_metadata")
await session.commit()
logging.info(
f"File renamed in Google Drive: '{old_name}''{new_name}' "
f"(no re-processing needed)"
)
logging.info(f"Document for file {filename} unchanged. Skipping.")
return True, existing_document
# Content has changed — guard against content_hash collision before
# expensive ETL processing.
collision_doc = await check_duplicate_document(session, content_hash)
if collision_doc and collision_doc.id != existing_document.id:
logging.warning(
"Content-hash collision for %s: identical content exists in "
"document #%s (%s). Skipping re-processing.",
filename,
collision_doc.id,
collision_doc.document_type,
)
if DocumentStatus.is_state(
existing_document.status, DocumentStatus.PENDING
) or DocumentStatus.is_state(
existing_document.status, DocumentStatus.PROCESSING
):
await session.delete(existing_document)
await session.commit()
return True, None
return True, existing_document
logging.info(f"Content changed for file {filename}. Updating document.")
return False, None
async def find_existing_document_with_migration(
session: AsyncSession,
primary_hash: str,
legacy_hash: str | None,
content_hash: str | None = None,
) -> Document | None:
"""
Find existing document, checking primary hash, legacy hash, and content_hash.
Supports migration from filename-based to file_id-based hashing for
Google Drive files, with content_hash fallback for cross-source dedup.
"""
existing_document = await check_document_by_unique_identifier(session, primary_hash)
if not existing_document and legacy_hash:
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
logging.info(
"Found legacy document (filename-based hash), "
"will migrate to file_id-based hash"
)
if not existing_document and content_hash:
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
logging.info(
f"Found duplicate content from different source (content_hash match). "
f"Original document ID: {existing_document.id}, "
f"type: {existing_document.document_type}"
)
return existing_document
# ---------------------------------------------------------------------------
# Connector helpers
# ---------------------------------------------------------------------------
async def update_document_from_connector(
document: Document | None,
connector: dict | None,
session: AsyncSession,
) -> None:
"""Update document type, metadata, and connector_id from connector info."""
if not document or not connector:
return
if "type" in connector:
document.document_type = connector["type"]
if "metadata" in connector:
if not document.document_metadata:
document.document_metadata = connector["metadata"]
else:
merged = {**document.document_metadata, **connector["metadata"]}
document.document_metadata = merged
if "connector_id" in connector:
document.connector_id = connector["connector_id"]
await session.commit()
# ---------------------------------------------------------------------------
# Timeout calculations
# ---------------------------------------------------------------------------
def calculate_upload_timeout(file_size_bytes: int) -> float:
"""Calculate upload timeout based on file size (conservative for slow connections)."""
estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5
return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT))
def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float:
"""Calculate job processing timeout based on page count and file size."""
page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT)
size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60
return max(page_based_timeout, size_based_timeout)

View file

@ -0,0 +1,285 @@
"""
Unified document save/update logic for file processors.
Replaces the three nearly-identical ``add_received_file_document_using_*``
functions with a single ``save_file_document`` function plus thin wrappers
for backward compatibility.
"""
import logging
from langchain_core.documents import Document as LangChainDocument
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentStatus, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
)
from ._helpers import (
find_existing_document_with_migration,
get_google_drive_unique_identifier,
handle_existing_document_update,
)
from .base import get_current_timestamp, safe_set_chunks
# ---------------------------------------------------------------------------
# Summary generation
# ---------------------------------------------------------------------------
async def _generate_summary(
markdown_content: str,
file_name: str,
etl_service: str,
user_llm,
enable_summary: bool,
) -> tuple[str, list[float]]:
"""
Generate a document summary and embedding.
Docling uses its own large-document summary strategy; other ETL services
use the standard ``generate_document_summary`` helper.
"""
if not enable_summary:
summary = f"File: {file_name}\n\n{markdown_content[:4000]}"
return summary, embed_text(summary)
if etl_service == "DOCLING":
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
summary_text = await docling_service.process_large_document_summary(
content=markdown_content, llm=user_llm, document_title=file_name
)
meta = {
"file_name": file_name,
"etl_service": etl_service,
"document_type": "File Document",
}
parts = ["# DOCUMENT METADATA"]
for key, value in meta.items():
if value:
formatted_key = key.replace("_", " ").title()
parts.append(f"**{formatted_key}:** {value}")
enhanced = "\n".join(parts) + "\n\n# DOCUMENT SUMMARY\n\n" + summary_text
return enhanced, embed_text(enhanced)
# Standard summary (Unstructured / LlamaCloud / others)
meta = {
"file_name": file_name,
"etl_service": etl_service,
"document_type": "File Document",
}
return await generate_document_summary(markdown_content, user_llm, meta)
# ---------------------------------------------------------------------------
# Unified save function
# ---------------------------------------------------------------------------
async def save_file_document(
session: AsyncSession,
file_name: str,
markdown_content: str,
search_space_id: int,
user_id: str,
etl_service: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""
Process and store a file document with deduplication and migration support.
Handles both creating new documents and updating existing ones. This is
the single implementation behind the per-ETL-service wrapper functions.
Args:
session: Database session
file_name: Name of the processed file
markdown_content: Markdown content to store
search_space_id: ID of the search space
user_id: ID of the user
etl_service: Name of the ETL service (UNSTRUCTURED, LLAMACLOUD, DOCLING)
connector: Optional connector info for Google Drive files
enable_summary: Whether to generate an AI summary
Returns:
Document object if successful, None if duplicate detected
"""
try:
primary_hash, legacy_hash = get_google_drive_unique_identifier(
connector, file_name, search_space_id
)
content_hash = generate_content_hash(markdown_content, search_space_id)
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash, content_hash
)
if existing_document:
should_skip, doc = await handle_existing_document_update(
session,
existing_document,
content_hash,
connector,
file_name,
primary_hash,
)
if should_skip:
return doc
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} "
f"in search space {search_space_id}"
)
summary_content, summary_embedding = await _generate_summary(
markdown_content, file_name, etl_service, user_llm, enable_summary
)
chunks = await create_document_chunks(markdown_content)
doc_metadata = {"FILE_NAME": file_name, "ETL_SERVICE": etl_service}
if existing_document:
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = doc_metadata
await safe_set_chunks(session, existing_document, chunks)
existing_document.source_markdown = markdown_content
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
existing_document.status = DocumentStatus.ready()
await session.commit()
await session.refresh(existing_document)
return existing_document
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=doc_type,
document_metadata=doc_metadata,
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=primary_hash,
source_markdown=markdown_content,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
status=DocumentStatus.ready(),
)
session.add(document)
await session.commit()
await session.refresh(document)
return document
except SQLAlchemyError as db_error:
await session.rollback()
if "ix_documents_content_hash" in str(db_error):
logging.warning(
"content_hash collision during commit for %s (%s). Skipping.",
file_name,
etl_service,
)
return None
raise db_error
except Exception as e:
await session.rollback()
raise RuntimeError(
f"Failed to process file document using {etl_service}: {e!s}"
) from e
# ---------------------------------------------------------------------------
# Backward-compatible wrapper functions
# ---------------------------------------------------------------------------
async def add_received_file_document_using_unstructured(
session: AsyncSession,
file_name: str,
unstructured_processed_elements: list[LangChainDocument],
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""Process and store a file document using the Unstructured service."""
from app.utils.document_converters import convert_document_to_markdown
markdown_content = await convert_document_to_markdown(
unstructured_processed_elements
)
return await save_file_document(
session,
file_name,
markdown_content,
search_space_id,
user_id,
"UNSTRUCTURED",
connector,
enable_summary,
)
async def add_received_file_document_using_llamacloud(
session: AsyncSession,
file_name: str,
llamacloud_markdown_document: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""Process and store document content parsed by LlamaCloud."""
return await save_file_document(
session,
file_name,
llamacloud_markdown_document,
search_space_id,
user_id,
"LLAMACLOUD",
connector,
enable_summary,
)
async def add_received_file_document_using_docling(
session: AsyncSession,
file_name: str,
docling_markdown_document: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""Process and store document content parsed by Docling."""
return await save_file_document(
session,
file_name,
docling_markdown_document,
search_space_id,
user_id,
"DOCLING",
connector,
enable_summary,
)

View file

@ -14,88 +14,19 @@ from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from ._helpers import (
find_existing_document_with_migration,
get_google_drive_unique_identifier,
)
from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
get_current_timestamp,
safe_set_chunks,
)
def _get_google_drive_unique_identifier(
connector: dict | None,
filename: str,
search_space_id: int,
) -> tuple[str, str | None]:
"""
Get unique identifier hash for a file, with special handling for Google Drive.
For Google Drive files, uses file_id as the unique identifier (doesn't change on rename).
For other files, uses filename.
Args:
connector: Optional connector info dict with type and metadata
filename: The filename (used for non-Google Drive files or as fallback)
search_space_id: The search space ID
Returns:
Tuple of (primary_hash, legacy_hash or None)
"""
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
metadata = connector.get("metadata", {})
file_id = metadata.get("google_drive_file_id")
if file_id:
primary_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
legacy_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, filename, search_space_id
)
return primary_hash, legacy_hash
primary_hash = generate_unique_identifier_hash(
DocumentType.FILE, filename, search_space_id
)
return primary_hash, None
async def _find_existing_document_with_migration(
session: AsyncSession,
primary_hash: str,
legacy_hash: str | None,
content_hash: str | None = None,
) -> Document | None:
"""
Find existing document, checking both new hash and legacy hash for migration,
with fallback to content_hash for cross-source deduplication.
"""
existing_document = await check_document_by_unique_identifier(session, primary_hash)
if not existing_document and legacy_hash:
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
logging.info(
"Found legacy document (filename-based hash), will migrate to file_id-based hash"
)
# Fallback: check by content_hash to catch duplicates from different sources
if not existing_document and content_hash:
existing_document = await check_duplicate_document(session, content_hash)
if existing_document:
logging.info(
f"Found duplicate content from different source (content_hash match). "
f"Original document ID: {existing_document.id}, type: {existing_document.document_type}"
)
return existing_document
async def _handle_existing_document_update(
session: AsyncSession,
existing_document: Document,
@ -224,7 +155,7 @@ async def add_received_markdown_file_document(
try:
# Generate unique identifier hash (uses file_id for Google Drive, filename for others)
primary_hash, legacy_hash = _get_google_drive_unique_identifier(
primary_hash, legacy_hash = get_google_drive_unique_identifier(
connector, file_name, search_space_id
)
@ -232,7 +163,7 @@ async def add_received_markdown_file_document(
content_hash = generate_content_hash(file_in_markdown, search_space_id)
# Check if document exists (with migration support for Google Drive and content_hash fallback)
existing_document = await _find_existing_document_with_migration(
existing_document = await find_existing_document_with_migration(
session, primary_hash, legacy_hash, content_hash
)