feat: Implement file upload limits and page limit enforcement in backend

- Added constants for maximum files per upload, per-file size, and total upload size.
- Enhanced document upload route to validate file counts and sizes, returning appropriate HTTP errors.
- Introduced end-to-end tests for upload limits and page limit enforcement, ensuring correct behavior under various scenarios.
- Updated test helpers to support notification retrieval for page limit exceeded scenarios.
This commit is contained in:
Anish Sarkar 2026-02-26 01:25:34 +05:30
parent 93c0af475b
commit a57ab02900
7 changed files with 628 additions and 2 deletions

View file

@ -44,6 +44,10 @@ os.environ["UNSTRUCTURED_HAS_PATCHED_LOOP"] = "1"
router = APIRouter()
MAX_FILES_PER_UPLOAD = 10
MAX_FILE_SIZE_BYTES = 50 * 1024 * 1024 # 50 MB per file
MAX_TOTAL_SIZE_BYTES = 200 * 1024 * 1024 # 200 MB total
@router.post("/documents")
async def create_documents(
@ -148,12 +152,37 @@ async def create_documents_file_upload(
if not files:
raise HTTPException(status_code=400, detail="No files provided")
if len(files) > MAX_FILES_PER_UPLOAD:
raise HTTPException(
status_code=413,
detail=f"Too many files. Maximum {MAX_FILES_PER_UPLOAD} files per upload.",
)
total_size = 0
for file in files:
file_size = file.size or 0
if file_size > MAX_FILE_SIZE_BYTES:
raise HTTPException(
status_code=413,
detail=f"File '{file.filename}' ({file_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_FILE_SIZE_BYTES // (1024 * 1024)} MB per-file limit.",
)
total_size += file_size
if total_size > MAX_TOTAL_SIZE_BYTES:
raise HTTPException(
status_code=413,
detail=f"Total upload size ({total_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_TOTAL_SIZE_BYTES // (1024 * 1024)} MB limit.",
)
created_documents: list[Document] = []
files_to_process: list[
tuple[Document, str, str]
] = [] # (document, temp_path, filename)
skipped_duplicates = 0
duplicate_document_ids: list[int] = []
actual_total_size = 0
# ===== PHASE 1: Create pending documents for all files =====
# This makes ALL documents visible in the UI immediately with pending status
@ -169,11 +198,28 @@ async def create_documents_file_upload(
temp_path = temp_file.name
content = await file.read()
file_size = len(content)
if file_size > MAX_FILE_SIZE_BYTES:
os.unlink(temp_path)
raise HTTPException(
status_code=413,
detail=f"File '{file.filename}' ({file_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_FILE_SIZE_BYTES // (1024 * 1024)} MB per-file limit.",
)
actual_total_size += file_size
if actual_total_size > MAX_TOTAL_SIZE_BYTES:
os.unlink(temp_path)
raise HTTPException(
status_code=413,
detail=f"Total upload size ({actual_total_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_TOTAL_SIZE_BYTES // (1024 * 1024)} MB limit.",
)
with open(temp_path, "wb") as f:
f.write(content)
file_size = len(content)
# Generate unique identifier for deduplication check
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file.filename or "unknown", search_space_id

View file

@ -177,6 +177,8 @@ markers = [
"document: document upload and processing tests",
"connector: connector indexing tests",
"chat: chat and agent tests",
"page_limit: page limit enforcement tests",
"upload_limit: file upload limit validation tests",
]
[tool.setuptools.packages.find]

View file

@ -13,6 +13,7 @@ from dotenv import load_dotenv
from tests.utils.helpers import (
BACKEND_URL,
TEST_EMAIL,
auth_headers,
delete_document,
get_auth_token,
@ -139,3 +140,67 @@ async def _cleanup_documents(
)
finally:
await conn.close()
# ---------------------------------------------------------------------------
# Page-limit helpers (direct DB access)
# ---------------------------------------------------------------------------
async def _get_user_page_usage(email: str) -> tuple[int, int]:
"""Return ``(pages_used, pages_limit)`` for the given user."""
conn = await asyncpg.connect(DATABASE_URL)
try:
row = await conn.fetchrow(
'SELECT pages_used, pages_limit FROM "user" WHERE email = $1',
email,
)
assert row is not None, f"User {email!r} not found in database"
return row["pages_used"], row["pages_limit"]
finally:
await conn.close()
async def _set_user_page_limits(
email: str, *, pages_used: int, pages_limit: int
) -> None:
"""Overwrite ``pages_used`` and ``pages_limit`` for the given user."""
conn = await asyncpg.connect(DATABASE_URL)
try:
await conn.execute(
'UPDATE "user" SET pages_used = $1, pages_limit = $2 WHERE email = $3',
pages_used,
pages_limit,
email,
)
finally:
await conn.close()
@pytest.fixture
async def page_limits():
"""
Fixture that exposes helpers for manipulating the test user's page limits.
Automatically restores the original values after each test.
Usage inside a test::
await page_limits.set(pages_used=0, pages_limit=100)
used, limit = await page_limits.get()
"""
class _PageLimits:
async def set(self, *, pages_used: int, pages_limit: int) -> None:
await _set_user_page_limits(
TEST_EMAIL, pages_used=pages_used, pages_limit=pages_limit
)
async def get(self) -> tuple[int, int]:
return await _get_user_page_usage(TEST_EMAIL)
original = await _get_user_page_usage(TEST_EMAIL)
yield _PageLimits()
await _set_user_page_limits(
TEST_EMAIL, pages_used=original[0], pages_limit=original[1]
)

View file

@ -0,0 +1,318 @@
"""
End-to-end tests for page-limit enforcement during document upload.
These tests manipulate the test user's ``pages_used`` / ``pages_limit``
columns directly in the database and then exercise the upload pipeline to
verify that:
- Uploads are rejected *before* ETL when the limit is exhausted.
- ``pages_used`` increases after a successful upload.
- A ``page_limit_exceeded`` notification is created on rejection.
- ``pages_used`` is not modified when a document fails processing.
All tests reuse the existing small fixtures (``sample.pdf``, ``sample.txt``)
so no additional processing time is introduced.
Prerequisites (must be running):
- FastAPI backend
- PostgreSQL + pgvector
- Redis
- Celery worker
"""
from __future__ import annotations
import httpx
import pytest
from tests.utils.helpers import (
get_notifications,
poll_document_status,
upload_file,
)
pytestmark = pytest.mark.page_limit
# ---------------------------------------------------------------------------
# Test A: Successful upload increments pages_used
# ---------------------------------------------------------------------------
class TestPageUsageIncrementsOnSuccess:
"""After a successful PDF upload the user's ``pages_used`` must grow."""
async def test_pages_used_increases_after_pdf_upload(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=0, pages_limit=1000)
resp = await upload_file(
client, headers, "sample.pdf", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
statuses = await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id, timeout=300.0
)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "ready"
used, _ = await page_limits.get()
assert used > 0, "pages_used should have increased after successful processing"
# ---------------------------------------------------------------------------
# Test B: Upload rejected when page limit is fully exhausted
# ---------------------------------------------------------------------------
class TestUploadRejectedWhenLimitExhausted:
"""
When ``pages_used == pages_limit`` (zero remaining) the document
should reach ``failed`` status with a page-limit reason.
"""
async def test_pdf_fails_when_no_pages_remaining(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=100, pages_limit=100)
resp = await upload_file(
client, headers, "sample.pdf", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
statuses = await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id, timeout=300.0
)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "failed"
reason = statuses[did]["status"].get("reason", "").lower()
assert "page limit" in reason, (
f"Expected 'page limit' in failure reason, got: {reason!r}"
)
async def test_pages_used_unchanged_after_limit_rejection(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=50, pages_limit=50)
resp = await upload_file(
client, headers, "sample.pdf", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id, timeout=300.0
)
used, _ = await page_limits.get()
assert used == 50, (
f"pages_used should remain 50 after rejected upload, got {used}"
)
# ---------------------------------------------------------------------------
# Test C: Page-limit notification is created on rejection
# ---------------------------------------------------------------------------
class TestPageLimitNotification:
"""A ``page_limit_exceeded`` notification must be created when upload
is rejected due to the limit."""
async def test_page_limit_exceeded_notification_created(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=100, pages_limit=100)
resp = await upload_file(
client, headers, "sample.pdf", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id, timeout=300.0
)
notifications = await get_notifications(
client,
headers,
type_filter="page_limit_exceeded",
search_space_id=search_space_id,
)
assert len(notifications) >= 1, (
"Expected at least one page_limit_exceeded notification"
)
latest = notifications[0]
assert "page limit" in latest["title"].lower() or "page limit" in latest["message"].lower(), (
f"Notification should mention page limit: title={latest['title']!r}, "
f"message={latest['message']!r}"
)
# ---------------------------------------------------------------------------
# Test D: Successful upload creates a completed document_processing notification
# ---------------------------------------------------------------------------
class TestDocumentProcessingNotification:
"""A ``document_processing`` notification with ``completed`` status must
exist after a successful upload."""
async def test_processing_completed_notification_exists(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=0, pages_limit=1000)
resp = await upload_file(
client, headers, "sample.txt", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id
)
notifications = await get_notifications(
client,
headers,
type_filter="document_processing",
search_space_id=search_space_id,
)
completed = [n for n in notifications if n.get("metadata", {}).get("processing_stage") == "completed"]
assert len(completed) >= 1, (
"Expected at least one document_processing notification with 'completed' stage"
)
# ---------------------------------------------------------------------------
# Test E: pages_used unchanged when a document fails for non-limit reasons
# ---------------------------------------------------------------------------
class TestPagesUnchangedOnProcessingFailure:
"""If a document fails during ETL (e.g. empty/corrupt file) rather than
a page-limit rejection, ``pages_used`` should remain unchanged."""
async def test_pages_used_stable_on_etl_failure(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=10, pages_limit=1000)
resp = await upload_file(
client, headers, "empty.pdf", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
if doc_ids:
statuses = await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id, timeout=120.0
)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "failed"
used, _ = await page_limits.get()
assert used == 10, (
f"pages_used should remain 10 after ETL failure, got {used}"
)
# ---------------------------------------------------------------------------
# Test F: Second upload rejected after first consumes remaining quota
# ---------------------------------------------------------------------------
class TestSecondUploadExceedsLimit:
"""Upload one PDF successfully, consuming the quota, then verify a
second upload is rejected."""
async def test_second_upload_rejected_after_quota_consumed(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
# Give just enough room for one ~1-page PDF
await page_limits.set(pages_used=0, pages_limit=1)
resp1 = await upload_file(
client, headers, "sample.pdf", search_space_id=search_space_id
)
assert resp1.status_code == 200
first_ids = resp1.json()["document_ids"]
cleanup_doc_ids.extend(first_ids)
statuses1 = await poll_document_status(
client, headers, first_ids, search_space_id=search_space_id, timeout=300.0
)
for did in first_ids:
assert statuses1[did]["status"]["state"] == "ready"
# Second upload — should fail because quota is now consumed
resp2 = await upload_file(
client,
headers,
"sample.pdf",
search_space_id=search_space_id,
filename_override="sample_copy.pdf",
)
assert resp2.status_code == 200
second_ids = resp2.json()["document_ids"]
cleanup_doc_ids.extend(second_ids)
statuses2 = await poll_document_status(
client, headers, second_ids, search_space_id=search_space_id, timeout=300.0
)
for did in second_ids:
assert statuses2[did]["status"]["state"] == "failed"
reason = statuses2[did]["status"].get("reason", "").lower()
assert "page limit" in reason, (
f"Expected 'page limit' in failure reason, got: {reason!r}"
)

View file

@ -0,0 +1,143 @@
"""
End-to-end tests for backend file upload limit enforcement.
These tests verify that the API rejects uploads that exceed:
- Max files per upload (10)
- Max per-file size (50 MB)
- Max total upload size (200 MB)
The limits mirror the frontend's DocumentUploadTab.tsx constants and are
enforced server-side to protect against direct API calls.
Prerequisites (must be running):
- FastAPI backend
- PostgreSQL + pgvector
"""
from __future__ import annotations
import io
import httpx
import pytest
pytestmark = pytest.mark.upload_limit
# ---------------------------------------------------------------------------
# Test A: File count limit
# ---------------------------------------------------------------------------
class TestFileCountLimit:
"""Uploading more than 10 files in a single request should be rejected."""
async def test_11_files_returns_413(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
):
files = [
("files", (f"file_{i}.txt", io.BytesIO(b"test content"), "text/plain"))
for i in range(11)
]
resp = await client.post(
"/api/v1/documents/fileupload",
headers=headers,
files=files,
data={"search_space_id": str(search_space_id)},
)
assert resp.status_code == 413
assert "too many files" in resp.json()["detail"].lower()
async def test_10_files_accepted(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
):
files = [
("files", (f"file_{i}.txt", io.BytesIO(b"test content"), "text/plain"))
for i in range(10)
]
resp = await client.post(
"/api/v1/documents/fileupload",
headers=headers,
files=files,
data={"search_space_id": str(search_space_id)},
)
assert resp.status_code == 200
cleanup_doc_ids.extend(resp.json().get("document_ids", []))
# ---------------------------------------------------------------------------
# Test B: Per-file size limit
# ---------------------------------------------------------------------------
class TestPerFileSizeLimit:
"""A single file exceeding 50 MB should be rejected."""
async def test_oversized_file_returns_413(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
):
oversized = io.BytesIO(b"\x00" * (50 * 1024 * 1024 + 1))
resp = await client.post(
"/api/v1/documents/fileupload",
headers=headers,
files=[("files", ("big.pdf", oversized, "application/pdf"))],
data={"search_space_id": str(search_space_id)},
)
assert resp.status_code == 413
assert "per-file limit" in resp.json()["detail"].lower()
async def test_file_at_limit_accepted(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
):
at_limit = io.BytesIO(b"\x00" * (50 * 1024 * 1024))
resp = await client.post(
"/api/v1/documents/fileupload",
headers=headers,
files=[("files", ("exact50mb.txt", at_limit, "text/plain"))],
data={"search_space_id": str(search_space_id)},
)
assert resp.status_code == 200
cleanup_doc_ids.extend(resp.json().get("document_ids", []))
# ---------------------------------------------------------------------------
# Test C: Total upload size limit
# ---------------------------------------------------------------------------
class TestTotalSizeLimit:
"""Multiple files whose combined size exceeds 200 MB should be rejected."""
async def test_total_size_over_200mb_returns_413(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
):
chunk_size = 45 * 1024 * 1024 # 45 MB each
files = [
("files", (f"chunk_{i}.txt", io.BytesIO(b"\x00" * chunk_size), "text/plain"))
for i in range(5) # 5 x 45 MB = 225 MB > 200 MB
]
resp = await client.post(
"/api/v1/documents/fileupload",
headers=headers,
files=files,
data={"search_space_id": str(search_space_id)},
)
assert resp.status_code == 413
assert "total upload size" in resp.json()["detail"].lower()

View file

@ -184,3 +184,29 @@ async def delete_document(
f"/api/v1/documents/{document_id}",
headers=headers,
)
async def get_notifications(
client: httpx.AsyncClient,
headers: dict[str, str],
*,
type_filter: str | None = None,
search_space_id: int | None = None,
limit: int = 50,
) -> list[dict]:
"""Fetch notifications for the authenticated user, optionally filtered by type."""
params: dict[str, str | int] = {"limit": limit}
if type_filter:
params["type"] = type_filter
if search_space_id is not None:
params["search_space_id"] = search_space_id
resp = await client.get(
"/api/v1/notifications",
headers=headers,
params=params,
)
assert resp.status_code == 200, (
f"GET notifications failed ({resp.status_code}): {resp.text}"
)
return resp.json()["items"]

View file

@ -82,6 +82,10 @@ const CYCLING_PLACEHOLDERS = [
const CHAT_UPLOAD_ACCEPT =
".pdf,.doc,.docx,.txt,.md,.markdown,.ppt,.pptx,.xls,.xlsx,.xlsm,.xlsb,.csv,.html,.htm,.xml,.rtf,.epub,.jpg,.jpeg,.png,.bmp,.webp,.tiff,.tif,.mp3,.mp4,.mpeg,.mpga,.m4a,.wav,.webm";
const CHAT_MAX_FILES = 10;
const CHAT_MAX_FILE_SIZE_BYTES = 50 * 1024 * 1024; // 50 MB per file
const CHAT_MAX_TOTAL_SIZE_BYTES = 200 * 1024 * 1024; // 200 MB total
type UploadState = "pending" | "processing" | "ready" | "failed";
interface UploadedMentionDoc {
@ -534,6 +538,28 @@ const Composer: FC = () => {
event.target.value = "";
if (files.length === 0 || !search_space_id) return;
if (files.length > CHAT_MAX_FILES) {
toast.error(`Too many files. Maximum ${CHAT_MAX_FILES} files per upload.`);
return;
}
let totalSize = 0;
for (const file of files) {
if (file.size > CHAT_MAX_FILE_SIZE_BYTES) {
toast.error(
`File "${file.name}" (${(file.size / (1024 * 1024)).toFixed(1)} MB) exceeds the ${CHAT_MAX_FILE_SIZE_BYTES / (1024 * 1024)} MB per-file limit.`
);
return;
}
totalSize += file.size;
}
if (totalSize > CHAT_MAX_TOTAL_SIZE_BYTES) {
toast.error(
`Total upload size (${(totalSize / (1024 * 1024)).toFixed(1)} MB) exceeds the ${CHAT_MAX_TOTAL_SIZE_BYTES / (1024 * 1024)} MB limit.`
);
return;
}
setIsUploadingDocs(true);
try {
const uploadResponse = await documentsApiService.uploadDocument({