SurfSense/surfsense_backend/tests/e2e/test_page_limits.py
2026-02-26 03:05:20 +05:30

323 lines
11 KiB
Python

"""
End-to-end tests for page-limit enforcement during document upload.
These tests manipulate the test user's ``pages_used`` / ``pages_limit``
columns directly in the database and then exercise the upload pipeline to
verify that:
- Uploads are rejected *before* ETL when the limit is exhausted.
- ``pages_used`` increases after a successful upload.
- A ``page_limit_exceeded`` notification is created on rejection.
- ``pages_used`` is not modified when a document fails processing.
All tests reuse the existing small fixtures (``sample.pdf``, ``sample.txt``)
so no additional processing time is introduced.
Prerequisites (must be running):
- FastAPI backend
- PostgreSQL + pgvector
- Redis
- Celery worker
"""
from __future__ import annotations
import httpx
import pytest
from tests.utils.helpers import (
get_notifications,
poll_document_status,
upload_file,
)
pytestmark = pytest.mark.e2e
# ---------------------------------------------------------------------------
# Test A: Successful upload increments pages_used
# ---------------------------------------------------------------------------
class TestPageUsageIncrementsOnSuccess:
"""After a successful PDF upload the user's ``pages_used`` must grow."""
async def test_pages_used_increases_after_pdf_upload(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=0, pages_limit=1000)
resp = await upload_file(
client, headers, "sample.pdf", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
statuses = await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id, timeout=300.0
)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "ready"
used, _ = await page_limits.get()
assert used > 0, "pages_used should have increased after successful processing"
# ---------------------------------------------------------------------------
# Test B: Upload rejected when page limit is fully exhausted
# ---------------------------------------------------------------------------
class TestUploadRejectedWhenLimitExhausted:
"""
When ``pages_used == pages_limit`` (zero remaining) the document
should reach ``failed`` status with a page-limit reason.
"""
async def test_pdf_fails_when_no_pages_remaining(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=100, pages_limit=100)
resp = await upload_file(
client, headers, "sample.pdf", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
statuses = await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id, timeout=300.0
)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "failed"
reason = statuses[did]["status"].get("reason", "").lower()
assert "page limit" in reason, (
f"Expected 'page limit' in failure reason, got: {reason!r}"
)
async def test_pages_used_unchanged_after_limit_rejection(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=50, pages_limit=50)
resp = await upload_file(
client, headers, "sample.pdf", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id, timeout=300.0
)
used, _ = await page_limits.get()
assert used == 50, (
f"pages_used should remain 50 after rejected upload, got {used}"
)
# ---------------------------------------------------------------------------
# Test C: Page-limit notification is created on rejection
# ---------------------------------------------------------------------------
class TestPageLimitNotification:
"""A ``page_limit_exceeded`` notification must be created when upload
is rejected due to the limit."""
async def test_page_limit_exceeded_notification_created(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=100, pages_limit=100)
resp = await upload_file(
client, headers, "sample.pdf", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id, timeout=300.0
)
notifications = await get_notifications(
client,
headers,
type_filter="page_limit_exceeded",
search_space_id=search_space_id,
)
assert len(notifications) >= 1, (
"Expected at least one page_limit_exceeded notification"
)
latest = notifications[0]
assert (
"page limit" in latest["title"].lower()
or "page limit" in latest["message"].lower()
), (
f"Notification should mention page limit: title={latest['title']!r}, "
f"message={latest['message']!r}"
)
# ---------------------------------------------------------------------------
# Test D: Successful upload creates a completed document_processing notification
# ---------------------------------------------------------------------------
class TestDocumentProcessingNotification:
"""A ``document_processing`` notification with ``completed`` status must
exist after a successful upload."""
async def test_processing_completed_notification_exists(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=0, pages_limit=1000)
resp = await upload_file(
client, headers, "sample.txt", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id
)
notifications = await get_notifications(
client,
headers,
type_filter="document_processing",
search_space_id=search_space_id,
)
completed = [
n
for n in notifications
if n.get("metadata", {}).get("processing_stage") == "completed"
]
assert len(completed) >= 1, (
"Expected at least one document_processing notification with 'completed' stage"
)
# ---------------------------------------------------------------------------
# Test E: pages_used unchanged when a document fails for non-limit reasons
# ---------------------------------------------------------------------------
class TestPagesUnchangedOnProcessingFailure:
"""If a document fails during ETL (e.g. empty/corrupt file) rather than
a page-limit rejection, ``pages_used`` should remain unchanged."""
async def test_pages_used_stable_on_etl_failure(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
await page_limits.set(pages_used=10, pages_limit=1000)
resp = await upload_file(
client, headers, "empty.pdf", search_space_id=search_space_id
)
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
if doc_ids:
statuses = await poll_document_status(
client, headers, doc_ids, search_space_id=search_space_id, timeout=120.0
)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "failed"
used, _ = await page_limits.get()
assert used == 10, f"pages_used should remain 10 after ETL failure, got {used}"
# ---------------------------------------------------------------------------
# Test F: Second upload rejected after first consumes remaining quota
# ---------------------------------------------------------------------------
class TestSecondUploadExceedsLimit:
"""Upload one PDF successfully, consuming the quota, then verify a
second upload is rejected."""
async def test_second_upload_rejected_after_quota_consumed(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
page_limits,
):
# Give just enough room for one ~1-page PDF
await page_limits.set(pages_used=0, pages_limit=1)
resp1 = await upload_file(
client, headers, "sample.pdf", search_space_id=search_space_id
)
assert resp1.status_code == 200
first_ids = resp1.json()["document_ids"]
cleanup_doc_ids.extend(first_ids)
statuses1 = await poll_document_status(
client, headers, first_ids, search_space_id=search_space_id, timeout=300.0
)
for did in first_ids:
assert statuses1[did]["status"]["state"] == "ready"
# Second upload — should fail because quota is now consumed
resp2 = await upload_file(
client,
headers,
"sample.pdf",
search_space_id=search_space_id,
filename_override="sample_copy.pdf",
)
assert resp2.status_code == 200
second_ids = resp2.json()["document_ids"]
cleanup_doc_ids.extend(second_ids)
statuses2 = await poll_document_status(
client, headers, second_ids, search_space_id=search_space_id, timeout=300.0
)
for did in second_ids:
assert statuses2[did]["status"]["state"] == "failed"
reason = statuses2[did]["status"].get("reason", "").lower()
assert "page limit" in reason, (
f"Expected 'page limit' in failure reason, got: {reason!r}"
)