diff --git a/surfsense_backend/tests/conftest.py b/surfsense_backend/tests/conftest.py index effde2714..868664ca8 100644 --- a/surfsense_backend/tests/conftest.py +++ b/surfsense_backend/tests/conftest.py @@ -2,11 +2,14 @@ from __future__ import annotations -import contextlib +import os from collections.abc import AsyncGenerator +from pathlib import Path +import asyncpg import httpx import pytest +from dotenv import load_dotenv from tests.utils.helpers import ( BACKEND_URL, @@ -14,9 +17,38 @@ from tests.utils.helpers import ( delete_document, get_auth_token, get_search_space_id, - poll_document_status, ) +load_dotenv(Path(__file__).resolve().parent.parent / ".env") + +DATABASE_URL = os.environ.get( + "TEST_DATABASE_URL", + os.environ.get("DATABASE_URL", ""), +).replace("postgresql+asyncpg://", "postgresql://") + + +async def _force_delete_documents_db( + search_space_id: int, +) -> int: + """ + Bypass the API and delete documents directly from the database. + + This handles stuck documents in pending/processing state that the API + refuses to delete (409 Conflict). Chunks are cascade-deleted by the + foreign key constraint. + + Returns the number of deleted rows. + """ + conn = await asyncpg.connect(DATABASE_URL) + try: + result = await conn.execute( + "DELETE FROM documents WHERE search_space_id = $1", + search_space_id, + ) + return int(result.split()[-1]) + finally: + await conn.close() + @pytest.fixture(scope="session") def backend_url() -> str: @@ -43,68 +75,18 @@ async def search_space_id(backend_url: str, auth_token: str) -> int: @pytest.fixture(scope="session", autouse=True) async def _purge_test_search_space( - backend_url: str, - auth_token: str, search_space_id: int, ): """ Delete all documents in the test search space before the session starts. - Ensures no stale data from a previous run interferes with the current - session. Paginates through all documents and waits for any in-flight - documents to reach a terminal state before deleting. + Uses direct database access to bypass the API's 409 protection on + pending/processing documents. This ensures stuck documents from + previous crashed runs are always cleaned up. """ - hdrs = auth_headers(auth_token) - async with httpx.AsyncClient(base_url=backend_url, timeout=60.0) as client: - all_docs: list[dict] = [] - page = 0 - page_size = 200 - - while True: - resp = await client.get( - "/api/v1/documents", - headers=hdrs, - params={ - "search_space_id": search_space_id, - "page": page, - "page_size": page_size, - }, - ) - if resp.status_code != 200: - break - - body = resp.json() - all_docs.extend(body.get("items", [])) - - if not body.get("has_more", False): - break - page += 1 - - if not all_docs: - yield - return - - in_flight = [ - doc["id"] - for doc in all_docs - if doc.get("status", {}).get("state") in ("pending", "processing") - ] - if in_flight: - with contextlib.suppress(Exception): - await poll_document_status( - client, - hdrs, - in_flight, - search_space_id=search_space_id, - timeout=120.0, - ) - - for doc in all_docs: - with contextlib.suppress(Exception): - await client.delete( - f"/api/v1/documents/{doc['id']}", - headers=hdrs, - ) + deleted = await _force_delete_documents_db(search_space_id) + if deleted: + print(f"\n[purge] Deleted {deleted} stale document(s) from search space {search_space_id}") yield @@ -134,13 +116,30 @@ def cleanup_doc_ids() -> list[int]: async def _cleanup_documents( client: httpx.AsyncClient, headers: dict[str, str], + search_space_id: int, cleanup_doc_ids: list[int], ): """ - Runs after every test. Deletes any document IDs that were appended to - the ``cleanup_doc_ids`` list during the test body. + Runs after every test. Tries the API first for clean deletes, then + falls back to direct DB access for any stuck documents. """ yield + + remaining_ids: list[int] = [] for doc_id in cleanup_doc_ids: - with contextlib.suppress(Exception): - await delete_document(client, headers, doc_id) + try: + resp = await delete_document(client, headers, doc_id) + if resp.status_code == 409: + remaining_ids.append(doc_id) + except Exception: + remaining_ids.append(doc_id) + + if remaining_ids: + conn = await asyncpg.connect(DATABASE_URL) + try: + await conn.execute( + "DELETE FROM documents WHERE id = ANY($1::int[])", + remaining_ids, + ) + finally: + await conn.close()