feat: Add end-to-end tests for document upload pipeline and shared test utilities

- Introduced new test files for end-to-end testing of document uploads, including support for .txt, .md, and .pdf formats.
- Created shared fixtures and helper functions for authentication, document management, and cleanup.
- Added sample documents for testing purposes.
- Established a conftest.py file to provide reusable fixtures across test modules.
This commit is contained in:
Anish Sarkar 2026-02-25 16:39:45 +05:30
parent b7447b26f9
commit 41eb68663a
10 changed files with 802 additions and 0 deletions

View file

View file

@ -0,0 +1,73 @@
"""Root conftest — shared fixtures available to all test modules."""
from __future__ import annotations
import contextlib
from collections.abc import AsyncGenerator
import httpx
import pytest
from tests.utils.helpers import (
BACKEND_URL,
TEST_SEARCH_SPACE_ID,
auth_headers,
delete_document,
get_auth_token,
)
@pytest.fixture(scope="session")
def backend_url() -> str:
return BACKEND_URL
@pytest.fixture(scope="session")
def search_space_id() -> int:
return TEST_SEARCH_SPACE_ID
@pytest.fixture(scope="session")
async def auth_token(backend_url: str) -> str:
"""Authenticate once per session and return the JWT token."""
async with httpx.AsyncClient(
base_url=backend_url, timeout=30.0
) as client:
return await get_auth_token(client)
@pytest.fixture(scope="session")
def headers(auth_token: str) -> dict[str, str]:
"""Authorization headers reused across all tests in the session."""
return auth_headers(auth_token)
@pytest.fixture
async def client(backend_url: str) -> AsyncGenerator[httpx.AsyncClient]:
"""Per-test async HTTP client pointing at the running backend."""
async with httpx.AsyncClient(
base_url=backend_url, timeout=180.0
) as c:
yield c
@pytest.fixture
def cleanup_doc_ids() -> list[int]:
"""Accumulator for document IDs that should be deleted after the test."""
return []
@pytest.fixture(autouse=True)
async def _cleanup_documents(
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
"""
Runs after every test. Deletes any document IDs that were appended to
the ``cleanup_doc_ids`` list during the test body.
"""
yield
for doc_id in cleanup_doc_ids:
with contextlib.suppress(Exception):
await delete_document(client, headers, doc_id)

View file

View file

@ -0,0 +1,487 @@
"""
End-to-end tests for manual document upload.
These tests exercise the full pipeline:
API upload Celery task ETL extraction chunking embedding DB storage
Prerequisites (must be running):
- FastAPI backend
- PostgreSQL + pgvector
- Redis
- Celery worker
"""
from __future__ import annotations
import shutil
from pathlib import Path
import httpx
from tests.utils.helpers import (
FIXTURES_DIR,
TEST_SEARCH_SPACE_ID,
delete_document,
get_document,
poll_document_status,
upload_file,
upload_multiple_files,
)
# ---------------------------------------------------------------------------
# Helpers local to this module
# ---------------------------------------------------------------------------
def _assert_document_ready(doc: dict, *, expected_filename: str) -> None:
"""Common assertions for a successfully processed document."""
assert doc["title"] == expected_filename
assert doc["document_type"] == "FILE"
assert doc["content"], "Document content (summary) should not be empty"
assert doc["content_hash"], "content_hash should be set"
assert doc["document_metadata"].get("FILE_NAME") == expected_filename
assert doc["document_metadata"].get("ETL_SERVICE"), "ETL_SERVICE should be set"
if doc.get("status"):
assert doc["status"]["state"] == "ready"
# ---------------------------------------------------------------------------
# Test A: Upload a .txt file (direct read path — no ETL service needed)
# ---------------------------------------------------------------------------
class TestTxtFileUpload:
"""Upload a plain-text file and verify the full pipeline."""
async def test_upload_txt_returns_document_id(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_file(client, headers, "sample.txt")
assert resp.status_code == 200
body = resp.json()
assert body["pending_files"] >= 1
assert len(body["document_ids"]) >= 1
cleanup_doc_ids.extend(body["document_ids"])
async def test_txt_processing_reaches_ready(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_file(client, headers, "sample.txt")
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
statuses = await poll_document_status(client, headers, doc_ids)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "ready"
async def test_txt_document_fields_populated(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_file(client, headers, "sample.txt")
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
await poll_document_status(client, headers, doc_ids)
doc = await get_document(client, headers, doc_ids[0])
_assert_document_ready(doc, expected_filename="sample.txt")
assert doc["document_metadata"]["ETL_SERVICE"] == "MARKDOWN"
# ---------------------------------------------------------------------------
# Test B: Upload a .md file (markdown direct-read path)
# ---------------------------------------------------------------------------
class TestMarkdownFileUpload:
"""Upload a Markdown file and verify the full pipeline."""
async def test_md_processing_reaches_ready(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_file(client, headers, "sample.md")
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
statuses = await poll_document_status(client, headers, doc_ids)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "ready"
async def test_md_document_fields_populated(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_file(client, headers, "sample.md")
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
await poll_document_status(client, headers, doc_ids)
doc = await get_document(client, headers, doc_ids[0])
_assert_document_ready(doc, expected_filename="sample.md")
assert doc["document_metadata"]["ETL_SERVICE"] == "MARKDOWN"
# ---------------------------------------------------------------------------
# Test C: Upload a .pdf file (ETL path — Docling / Unstructured)
# ---------------------------------------------------------------------------
class TestPdfFileUpload:
"""Upload a PDF and verify it goes through the ETL extraction pipeline."""
async def test_pdf_processing_reaches_ready(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_file(client, headers, "sample.pdf")
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
statuses = await poll_document_status(
client, headers, doc_ids, timeout=300.0
)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "ready"
async def test_pdf_document_fields_populated(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_file(client, headers, "sample.pdf")
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
await poll_document_status(
client, headers, doc_ids, timeout=300.0
)
doc = await get_document(client, headers, doc_ids[0])
_assert_document_ready(doc, expected_filename="sample.pdf")
assert doc["document_metadata"]["ETL_SERVICE"] in {
"DOCLING",
"UNSTRUCTURED",
"LLAMACLOUD",
}
# ---------------------------------------------------------------------------
# Test D: Upload multiple files in a single request
# ---------------------------------------------------------------------------
class TestMultiFileUpload:
"""Upload several files at once and verify all are processed."""
async def test_multi_upload_returns_all_ids(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_multiple_files(
client, headers, ["sample.txt", "sample.md"]
)
assert resp.status_code == 200
body = resp.json()
assert body["pending_files"] == 2
assert len(body["document_ids"]) == 2
cleanup_doc_ids.extend(body["document_ids"])
async def test_multi_upload_all_reach_ready(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_multiple_files(
client, headers, ["sample.txt", "sample.md"]
)
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
statuses = await poll_document_status(client, headers, doc_ids)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "ready"
# ---------------------------------------------------------------------------
# Test E: Duplicate file upload (same file uploaded twice)
# ---------------------------------------------------------------------------
class TestDuplicateFileUpload:
"""
Uploading the exact same file a second time should be detected as a
duplicate via ``unique_identifier_hash``.
"""
async def test_duplicate_file_is_skipped(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
# First upload
resp1 = await upload_file(client, headers, "sample.txt")
assert resp1.status_code == 200
first_ids = resp1.json()["document_ids"]
cleanup_doc_ids.extend(first_ids)
await poll_document_status(client, headers, first_ids)
# Second upload of the same file
resp2 = await upload_file(client, headers, "sample.txt")
assert resp2.status_code == 200
body2 = resp2.json()
assert body2["skipped_duplicates"] >= 1
assert len(body2["duplicate_document_ids"]) >= 1
cleanup_doc_ids.extend(body2.get("document_ids", []))
# ---------------------------------------------------------------------------
# Test F: Duplicate content detection (different name, same content)
# ---------------------------------------------------------------------------
class TestDuplicateContentDetection:
"""
Uploading a file with a different name but identical content should be
detected as duplicate content via ``content_hash``.
"""
async def test_same_content_different_name_detected(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
tmp_path: Path,
):
# First upload
resp1 = await upload_file(client, headers, "sample.txt")
assert resp1.status_code == 200
first_ids = resp1.json()["document_ids"]
cleanup_doc_ids.extend(first_ids)
await poll_document_status(client, headers, first_ids)
# Copy fixture content to a differently named temp file
src = FIXTURES_DIR / "sample.txt"
dest = tmp_path / "renamed_sample.txt"
shutil.copy2(src, dest)
with open(dest, "rb") as f:
resp2 = await client.post(
"/api/v1/documents/fileupload",
headers=headers,
files={"files": ("renamed_sample.txt", f)},
data={"search_space_id": str(TEST_SEARCH_SPACE_ID)},
)
assert resp2.status_code == 200
second_ids = resp2.json()["document_ids"]
cleanup_doc_ids.extend(second_ids)
if second_ids:
statuses = await poll_document_status(client, headers, second_ids)
for did in second_ids:
assert statuses[did]["status"]["state"] == "failed"
assert "duplicate" in (
statuses[did]["status"].get("reason", "").lower()
)
# ---------------------------------------------------------------------------
# Test G: Empty / corrupt file handling
# ---------------------------------------------------------------------------
class TestEmptyFileUpload:
"""An empty file should be processed but ultimately fail gracefully."""
async def test_empty_pdf_fails(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_file(client, headers, "empty.pdf")
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
if doc_ids:
statuses = await poll_document_status(
client, headers, doc_ids, timeout=120.0
)
for did in doc_ids:
assert statuses[did]["status"]["state"] == "failed"
assert statuses[did]["status"].get("reason"), (
"Failed document should include a reason"
)
# ---------------------------------------------------------------------------
# Test H: Upload without authentication
# ---------------------------------------------------------------------------
class TestUnauthenticatedUpload:
"""Requests without a valid JWT should be rejected."""
async def test_upload_without_auth_returns_401(
self,
client: httpx.AsyncClient,
):
file_path = FIXTURES_DIR / "sample.txt"
with open(file_path, "rb") as f:
resp = await client.post(
"/api/v1/documents/fileupload",
files={"files": ("sample.txt", f)},
data={"search_space_id": str(TEST_SEARCH_SPACE_ID)},
)
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# Test I: Upload with no files attached
# ---------------------------------------------------------------------------
class TestNoFilesUpload:
"""Submitting the form with zero files should return a validation error."""
async def test_no_files_returns_error(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
):
resp = await client.post(
"/api/v1/documents/fileupload",
headers=headers,
data={"search_space_id": str(TEST_SEARCH_SPACE_ID)},
)
assert resp.status_code in {400, 422}
# ---------------------------------------------------------------------------
# Test J: Document deletion after successful upload
# ---------------------------------------------------------------------------
class TestDocumentDeletion:
"""Upload, wait for ready, delete, then verify it's gone."""
async def test_delete_processed_document(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
):
resp = await upload_file(client, headers, "sample.txt")
doc_ids = resp.json()["document_ids"]
await poll_document_status(client, headers, doc_ids)
del_resp = await delete_document(client, headers, doc_ids[0])
assert del_resp.status_code == 200
get_resp = await client.get(
f"/api/v1/documents/{doc_ids[0]}",
headers=headers,
)
assert get_resp.status_code == 404
# ---------------------------------------------------------------------------
# Test K: Cannot delete a document while it is still processing
# ---------------------------------------------------------------------------
class TestDeleteWhileProcessing:
"""Attempting to delete a pending/processing document should be rejected."""
async def test_delete_pending_document_returns_409(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_file(client, headers, "sample.pdf")
assert resp.status_code == 200
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
# Immediately try to delete before processing finishes
del_resp = await delete_document(client, headers, doc_ids[0])
assert del_resp.status_code == 409
# Let it finish so cleanup can work
await poll_document_status(
client, headers, doc_ids, timeout=300.0
)
# ---------------------------------------------------------------------------
# Test L: Status polling returns correct structure
# ---------------------------------------------------------------------------
class TestStatusPolling:
"""Verify the status endpoint returns well-formed responses."""
async def test_status_endpoint_returns_items(
self,
client: httpx.AsyncClient,
headers: dict[str, str],
cleanup_doc_ids: list[int],
):
resp = await upload_file(client, headers, "sample.txt")
doc_ids = resp.json()["document_ids"]
cleanup_doc_ids.extend(doc_ids)
status_resp = await client.get(
"/api/v1/documents/status",
headers=headers,
params={
"search_space_id": TEST_SEARCH_SPACE_ID,
"document_ids": ",".join(str(d) for d in doc_ids),
},
)
assert status_resp.status_code == 200
body = status_resp.json()
assert "items" in body
assert len(body["items"]) == len(doc_ids)
for item in body["items"]:
assert "id" in item
assert "status" in item
assert "state" in item["status"]
assert item["status"]["state"] in {
"pending",
"processing",
"ready",
"failed",
}
await poll_document_status(client, headers, doc_ids)

View file

View file

@ -0,0 +1,51 @@
# SurfSense Test Document
## Overview
This is a **sample markdown document** used for end-to-end testing of the manual
document upload pipeline. It includes various markdown formatting elements.
## Key Features
- Document upload and processing
- Automatic chunking of content
- Embedding generation for semantic search
- Real-time status tracking via ElectricSQL
## Technical Architecture
### Backend Stack
The SurfSense backend is built with:
1. **FastAPI** for the REST API
2. **PostgreSQL** with pgvector for vector storage
3. **Celery** with Redis for background task processing
4. **Docling/Unstructured** for document parsing (ETL)
### Processing Pipeline
Documents go through a multi-stage pipeline:
| Stage | Description |
|-------|-------------|
| Upload | File received via API endpoint |
| Parsing | Content extracted using ETL service |
| Chunking | Text split into semantic chunks |
| Embedding | Vector representations generated |
| Storage | Chunks stored with embeddings in pgvector |
## Code Example
```python
async def process_document(file_path: str) -> Document:
content = extract_content(file_path)
chunks = create_chunks(content)
embeddings = generate_embeddings(chunks)
return store_document(chunks, embeddings)
```
## Conclusion
This document serves as a test fixture to validate the complete document processing
pipeline from upload through to chunk creation and embedding storage.

Binary file not shown.

View file

@ -0,0 +1,34 @@
SurfSense Document Upload Test
This is a sample text document used for end-to-end testing of the manual document
upload pipeline in SurfSense. The document contains multiple paragraphs to ensure
that the chunking system has enough content to work with.
Artificial Intelligence and Machine Learning
Artificial intelligence (AI) is a broad field of computer science concerned with
building smart machines capable of performing tasks that typically require human
intelligence. Machine learning is a subset of AI that enables systems to learn and
improve from experience without being explicitly programmed.
Natural Language Processing
Natural language processing (NLP) is a subfield of linguistics, computer science,
and artificial intelligence concerned with the interactions between computers and
human language. Key applications include machine translation, sentiment analysis,
text summarization, and question answering systems.
Vector Databases and Semantic Search
Vector databases store data as high-dimensional vectors, enabling efficient
similarity search operations. When combined with embedding models, they power
semantic search systems that understand the meaning behind queries rather than
relying on exact keyword matches. This technology is fundamental to modern
retrieval-augmented generation (RAG) systems.
Document Processing Pipelines
Modern document processing pipelines involve several stages: extraction, transformation,
chunking, embedding generation, and storage. Each stage plays a critical role in
converting raw documents into searchable, structured knowledge that can be retrieved
and used by AI systems for accurate information retrieval and generation.

View file

@ -0,0 +1,157 @@
"""Shared test helpers for authentication, polling, and cleanup."""
from __future__ import annotations
import asyncio
import os
from pathlib import Path
import httpx
FIXTURES_DIR = Path(__file__).resolve().parent.parent / "fixtures"
BACKEND_URL = os.environ.get("TEST_BACKEND_URL", "http://localhost:8000")
TEST_EMAIL = os.environ.get("TEST_USER_EMAIL", "testuser@surfsense.com")
TEST_PASSWORD = os.environ.get("TEST_USER_PASSWORD", "testpassword123")
TEST_SEARCH_SPACE_ID = int(os.environ.get("TEST_SEARCH_SPACE_ID", "1"))
async def get_auth_token(client: httpx.AsyncClient) -> str:
"""Log in and return a Bearer JWT token."""
response = await client.post(
"/auth/jwt/login",
data={"username": TEST_EMAIL, "password": TEST_PASSWORD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert response.status_code == 200, (
f"Login failed ({response.status_code}): {response.text}"
)
return response.json()["access_token"]
def auth_headers(token: str) -> dict[str, str]:
"""Return Authorization header dict for a Bearer token."""
return {"Authorization": f"Bearer {token}"}
async def upload_file(
client: httpx.AsyncClient,
headers: dict[str, str],
fixture_name: str,
*,
search_space_id: int = TEST_SEARCH_SPACE_ID,
filename_override: str | None = None,
) -> httpx.Response:
"""Upload a single fixture file and return the raw response."""
file_path = FIXTURES_DIR / fixture_name
upload_name = filename_override or fixture_name
with open(file_path, "rb") as f:
return await client.post(
"/api/v1/documents/fileupload",
headers=headers,
files={"files": (upload_name, f)},
data={"search_space_id": str(search_space_id)},
)
async def upload_multiple_files(
client: httpx.AsyncClient,
headers: dict[str, str],
fixture_names: list[str],
*,
search_space_id: int = TEST_SEARCH_SPACE_ID,
) -> httpx.Response:
"""Upload multiple fixture files in a single request."""
files = []
open_handles = []
try:
for name in fixture_names:
fh = open(FIXTURES_DIR / name, "rb") # noqa: SIM115
open_handles.append(fh)
files.append(("files", (name, fh)))
return await client.post(
"/api/v1/documents/fileupload",
headers=headers,
files=files,
data={"search_space_id": str(search_space_id)},
)
finally:
for fh in open_handles:
fh.close()
async def poll_document_status(
client: httpx.AsyncClient,
headers: dict[str, str],
document_ids: list[int],
*,
search_space_id: int = TEST_SEARCH_SPACE_ID,
timeout: float = 180.0,
interval: float = 3.0,
) -> dict[int, dict]:
"""
Poll ``GET /api/v1/documents/status`` until every document reaches a
terminal state (``ready`` or ``failed``) or *timeout* seconds elapse.
Returns a mapping of ``{document_id: status_item_dict}``.
"""
ids_param = ",".join(str(d) for d in document_ids)
terminal_states = {"ready", "failed"}
elapsed = 0.0
while elapsed < timeout:
resp = await client.get(
"/api/v1/documents/status",
headers=headers,
params={
"search_space_id": search_space_id,
"document_ids": ids_param,
},
)
assert resp.status_code == 200, (
f"Status poll failed ({resp.status_code}): {resp.text}"
)
items = {item["id"]: item for item in resp.json()["items"]}
if all(
items.get(did, {}).get("status", {}).get("state") in terminal_states
for did in document_ids
):
return items
await asyncio.sleep(interval)
elapsed += interval
raise TimeoutError(
f"Documents {document_ids} did not reach terminal state within {timeout}s. "
f"Last status: {items}"
)
async def get_document(
client: httpx.AsyncClient,
headers: dict[str, str],
document_id: int,
) -> dict:
"""Fetch a single document by ID."""
resp = await client.get(
f"/api/v1/documents/{document_id}",
headers=headers,
)
assert resp.status_code == 200, (
f"GET document {document_id} failed ({resp.status_code}): {resp.text}"
)
return resp.json()
async def delete_document(
client: httpx.AsyncClient,
headers: dict[str, str],
document_id: int,
) -> httpx.Response:
"""Delete a document by ID, returning the raw response."""
return await client.delete(
f"/api/v1/documents/{document_id}",
headers=headers,
)