mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-25 00:36:31 +02:00
398 lines
12 KiB
Python
398 lines
12 KiB
Python
"""
|
|
Service for managing user page limits for ETL services.
|
|
"""
|
|
|
|
import os
|
|
from pathlib import Path, PurePosixPath
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
class PageLimitExceededError(Exception):
|
|
"""
|
|
Exception raised when a user exceeds their page processing limit.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str = "Page limit exceeded. Please contact admin to increase limits for your account.",
|
|
pages_used: int = 0,
|
|
pages_limit: int = 0,
|
|
pages_to_add: int = 0,
|
|
):
|
|
self.pages_used = pages_used
|
|
self.pages_limit = pages_limit
|
|
self.pages_to_add = pages_to_add
|
|
super().__init__(message)
|
|
|
|
|
|
class PageLimitService:
|
|
"""Service for checking and updating user page limits."""
|
|
|
|
def __init__(self, session: AsyncSession):
|
|
self.session = session
|
|
|
|
async def check_page_limit(
|
|
self, user_id: str, estimated_pages: int = 1
|
|
) -> tuple[bool, int, int]:
|
|
"""
|
|
Check if user has enough pages remaining for processing.
|
|
|
|
Args:
|
|
user_id: The user's ID
|
|
estimated_pages: Estimated number of pages to be processed
|
|
|
|
Returns:
|
|
Tuple of (has_capacity, pages_used, pages_limit)
|
|
|
|
Raises:
|
|
PageLimitExceededError: If user would exceed their page limit
|
|
"""
|
|
from app.db import User
|
|
|
|
# Get user's current page usage
|
|
result = await self.session.execute(
|
|
select(User.pages_used, User.pages_limit).where(User.id == user_id)
|
|
)
|
|
row = result.first()
|
|
|
|
if not row:
|
|
raise ValueError(f"User with ID {user_id} not found")
|
|
|
|
pages_used, pages_limit = row
|
|
|
|
# Check if adding estimated pages would exceed limit
|
|
if pages_used + estimated_pages > pages_limit:
|
|
raise PageLimitExceededError(
|
|
message=f"Processing this document would exceed your page limit. "
|
|
f"Used: {pages_used}/{pages_limit} pages. "
|
|
f"Document has approximately {estimated_pages} page(s). "
|
|
f"Please contact admin to increase limits for your account.",
|
|
pages_used=pages_used,
|
|
pages_limit=pages_limit,
|
|
pages_to_add=estimated_pages,
|
|
)
|
|
|
|
return True, pages_used, pages_limit
|
|
|
|
async def update_page_usage(
|
|
self, user_id: str, pages_to_add: int, allow_exceed: bool = False
|
|
) -> int:
|
|
"""
|
|
Update user's page usage after successful processing.
|
|
|
|
Args:
|
|
user_id: The user's ID
|
|
pages_to_add: Number of pages to add to usage
|
|
allow_exceed: If True, allows update even if it exceeds limit
|
|
(used when document was already processed after passing initial check)
|
|
|
|
Returns:
|
|
New total pages_used value
|
|
|
|
Raises:
|
|
PageLimitExceededError: If adding pages would exceed limit and allow_exceed is False
|
|
"""
|
|
from app.db import User
|
|
|
|
# Get user
|
|
result = await self.session.execute(select(User).where(User.id == user_id))
|
|
user = result.unique().scalar_one_or_none()
|
|
|
|
if not user:
|
|
raise ValueError(f"User with ID {user_id} not found")
|
|
|
|
# Check if this would exceed limit (only if allow_exceed is False)
|
|
new_usage = user.pages_used + pages_to_add
|
|
if not allow_exceed and new_usage > user.pages_limit:
|
|
raise PageLimitExceededError(
|
|
message=f"Cannot update page usage. Would exceed limit. "
|
|
f"Current: {user.pages_used}/{user.pages_limit}, "
|
|
f"Trying to add: {pages_to_add}",
|
|
pages_used=user.pages_used,
|
|
pages_limit=user.pages_limit,
|
|
pages_to_add=pages_to_add,
|
|
)
|
|
|
|
# Update usage
|
|
user.pages_used = new_usage
|
|
await self.session.commit()
|
|
await self.session.refresh(user)
|
|
|
|
return user.pages_used
|
|
|
|
async def get_page_usage(self, user_id: str) -> tuple[int, int]:
|
|
"""
|
|
Get user's current page usage and limit.
|
|
|
|
Args:
|
|
user_id: The user's ID
|
|
|
|
Returns:
|
|
Tuple of (pages_used, pages_limit)
|
|
"""
|
|
from app.db import User
|
|
|
|
result = await self.session.execute(
|
|
select(User.pages_used, User.pages_limit).where(User.id == user_id)
|
|
)
|
|
row = result.first()
|
|
|
|
if not row:
|
|
raise ValueError(f"User with ID {user_id} not found")
|
|
|
|
return row
|
|
|
|
def estimate_pages_from_elements(self, elements: list) -> int:
|
|
"""
|
|
Estimate page count from document elements (for Unstructured).
|
|
|
|
Args:
|
|
elements: List of document elements
|
|
|
|
Returns:
|
|
Estimated number of pages
|
|
"""
|
|
# For Unstructured, we can count unique page numbers in metadata
|
|
# or estimate based on content length
|
|
page_numbers = set()
|
|
|
|
for element in elements:
|
|
# Try to get page number from metadata
|
|
if hasattr(element, "metadata") and element.metadata:
|
|
page_num = element.metadata.get("page_number")
|
|
if page_num is not None:
|
|
page_numbers.add(page_num)
|
|
|
|
# If we found page numbers in metadata, use that count
|
|
if page_numbers:
|
|
return len(page_numbers)
|
|
|
|
# Otherwise, estimate: assume ~2000 chars per page
|
|
total_content_length = sum(
|
|
len(element.page_content) if hasattr(element, "page_content") else 0
|
|
for element in elements
|
|
)
|
|
estimated_pages = max(1, total_content_length // 2000)
|
|
|
|
return estimated_pages
|
|
|
|
def estimate_pages_from_markdown(self, markdown_documents: list) -> int:
|
|
"""
|
|
Estimate page count from markdown documents (for LlamaCloud).
|
|
|
|
Args:
|
|
markdown_documents: List of markdown document objects
|
|
|
|
Returns:
|
|
Estimated number of pages
|
|
"""
|
|
# For LlamaCloud, if split_by_page=True was used, each doc is a page
|
|
# Otherwise, estimate based on content length
|
|
if not markdown_documents:
|
|
return 1
|
|
|
|
# Check if documents have page metadata
|
|
total_pages = 0
|
|
for doc in markdown_documents:
|
|
if hasattr(doc, "metadata") and doc.metadata:
|
|
# If metadata contains page info, use it
|
|
page_num = doc.metadata.get("page", doc.metadata.get("page_number"))
|
|
if page_num is not None:
|
|
total_pages += 1
|
|
continue
|
|
|
|
# Otherwise estimate from content length
|
|
content_length = len(doc.text) if hasattr(doc, "text") else 0
|
|
estimated = max(1, content_length // 2000)
|
|
total_pages += estimated
|
|
|
|
return max(1, total_pages)
|
|
|
|
def estimate_pages_from_content_length(self, content_length: int) -> int:
|
|
"""
|
|
Estimate page count from content length (for Docling).
|
|
|
|
Args:
|
|
content_length: Length of the document content
|
|
|
|
Returns:
|
|
Estimated number of pages
|
|
"""
|
|
# Estimate ~2000 characters per page
|
|
return max(1, content_length // 2000)
|
|
|
|
@staticmethod
|
|
def estimate_pages_from_metadata(
|
|
file_name_or_ext: str, file_size: int | str | None = None
|
|
) -> int:
|
|
"""Size-based page estimation from file name/extension and byte size.
|
|
|
|
Pure function — no file I/O, no database access. Used by cloud
|
|
connectors (which only have API metadata) and as the internal
|
|
fallback for :meth:`estimate_pages_before_processing`.
|
|
|
|
``file_name_or_ext`` can be a full filename (``"report.pdf"``) or
|
|
a bare extension (``".pdf"``). ``file_size`` may be an int, a
|
|
stringified int from a cloud API, or *None*.
|
|
"""
|
|
if file_size is not None:
|
|
try:
|
|
file_size = int(file_size)
|
|
except (ValueError, TypeError):
|
|
file_size = 0
|
|
else:
|
|
file_size = 0
|
|
|
|
if file_size <= 0:
|
|
return 1
|
|
|
|
ext = PurePosixPath(file_name_or_ext).suffix.lower() if file_name_or_ext else ""
|
|
if not ext and file_name_or_ext.startswith("."):
|
|
ext = file_name_or_ext.lower()
|
|
file_ext = ext
|
|
|
|
if file_ext == ".pdf":
|
|
return max(1, file_size // (100 * 1024))
|
|
|
|
if file_ext in {
|
|
".doc",
|
|
".docx",
|
|
".docm",
|
|
".dot",
|
|
".dotm",
|
|
".odt",
|
|
".ott",
|
|
".sxw",
|
|
".stw",
|
|
".uot",
|
|
".rtf",
|
|
".pages",
|
|
".wpd",
|
|
".wps",
|
|
".abw",
|
|
".zabw",
|
|
".cwk",
|
|
".hwp",
|
|
".lwp",
|
|
".mcw",
|
|
".mw",
|
|
".sdw",
|
|
".vor",
|
|
}:
|
|
return max(1, file_size // (50 * 1024))
|
|
|
|
if file_ext in {
|
|
".ppt",
|
|
".pptx",
|
|
".pptm",
|
|
".pot",
|
|
".potx",
|
|
".odp",
|
|
".otp",
|
|
".sxi",
|
|
".sti",
|
|
".uop",
|
|
".key",
|
|
".sda",
|
|
".sdd",
|
|
".sdp",
|
|
}:
|
|
return max(1, file_size // (200 * 1024))
|
|
|
|
if file_ext in {
|
|
".xls",
|
|
".xlsx",
|
|
".xlsm",
|
|
".xlsb",
|
|
".xlw",
|
|
".xlr",
|
|
".ods",
|
|
".ots",
|
|
".fods",
|
|
".numbers",
|
|
".123",
|
|
".wk1",
|
|
".wk2",
|
|
".wk3",
|
|
".wk4",
|
|
".wks",
|
|
".wb1",
|
|
".wb2",
|
|
".wb3",
|
|
".wq1",
|
|
".wq2",
|
|
".csv",
|
|
".tsv",
|
|
".slk",
|
|
".sylk",
|
|
".dif",
|
|
".dbf",
|
|
".prn",
|
|
".qpw",
|
|
".602",
|
|
".et",
|
|
".eth",
|
|
}:
|
|
return max(1, file_size // (100 * 1024))
|
|
|
|
if file_ext in {".epub"}:
|
|
return max(1, file_size // (50 * 1024))
|
|
|
|
if file_ext in {".txt", ".log", ".md", ".markdown", ".htm", ".html", ".xml"}:
|
|
return max(1, file_size // 3000)
|
|
|
|
if file_ext in {
|
|
".jpg",
|
|
".jpeg",
|
|
".png",
|
|
".gif",
|
|
".bmp",
|
|
".tiff",
|
|
".webp",
|
|
".svg",
|
|
".cgm",
|
|
".odg",
|
|
".pbd",
|
|
}:
|
|
return 1
|
|
|
|
if file_ext in {".mp3", ".m4a", ".wav", ".mpga"}:
|
|
return max(1, file_size // (1024 * 1024))
|
|
|
|
if file_ext in {".mp4", ".mpeg", ".webm"}:
|
|
return max(1, file_size // (5 * 1024 * 1024))
|
|
|
|
return max(1, file_size // (80 * 1024))
|
|
|
|
def estimate_pages_before_processing(self, file_path: str) -> int:
|
|
"""
|
|
Estimate page count from a local file before processing.
|
|
|
|
For PDFs, attempts to read the actual page count via pypdf.
|
|
For everything else, delegates to :meth:`estimate_pages_from_metadata`.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
|
|
Returns:
|
|
Estimated number of pages
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
raise ValueError(f"File not found: {file_path}")
|
|
|
|
file_ext = Path(file_path).suffix.lower()
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
if file_ext == ".pdf":
|
|
try:
|
|
import pypdf
|
|
|
|
with open(file_path, "rb") as f:
|
|
pdf_reader = pypdf.PdfReader(f)
|
|
return len(pdf_reader.pages)
|
|
except Exception:
|
|
pass # fall through to size-based estimation
|
|
|
|
return self.estimate_pages_from_metadata(file_ext, file_size)
|