feat: Enhance test document purging by implementing direct database access

This commit is contained in:
Anish Sarkar 2026-02-25 19:51:29 +05:30
parent c3273af20b
commit c564e5f768

View file

@ -2,11 +2,14 @@
from __future__ import annotations
import contextlib
import os
from collections.abc import AsyncGenerator
from pathlib import Path
import asyncpg
import httpx
import pytest
from dotenv import load_dotenv
from tests.utils.helpers import (
BACKEND_URL,
@ -14,9 +17,38 @@ from tests.utils.helpers import (
delete_document,
get_auth_token,
get_search_space_id,
poll_document_status,
)
load_dotenv(Path(__file__).resolve().parent.parent / ".env")
DATABASE_URL = os.environ.get(
"TEST_DATABASE_URL",
os.environ.get("DATABASE_URL", ""),
).replace("postgresql+asyncpg://", "postgresql://")
async def _force_delete_documents_db(
search_space_id: int,
) -> int:
"""
Bypass the API and delete documents directly from the database.
This handles stuck documents in pending/processing state that the API
refuses to delete (409 Conflict). Chunks are cascade-deleted by the
foreign key constraint.
Returns the number of deleted rows.
"""
conn = await asyncpg.connect(DATABASE_URL)
try:
result = await conn.execute(
"DELETE FROM documents WHERE search_space_id = $1",
search_space_id,
)
return int(result.split()[-1])
finally:
await conn.close()
@pytest.fixture(scope="session")
def backend_url() -> str:
@ -43,68 +75,18 @@ async def search_space_id(backend_url: str, auth_token: str) -> int:
@pytest.fixture(scope="session", autouse=True)
async def _purge_test_search_space(
backend_url: str,
auth_token: str,
search_space_id: int,
):
"""
Delete all documents in the test search space before the session starts.
Ensures no stale data from a previous run interferes with the current
session. Paginates through all documents and waits for any in-flight
documents to reach a terminal state before deleting.
Uses direct database access to bypass the API's 409 protection on
pending/processing documents. This ensures stuck documents from
previous crashed runs are always cleaned up.
"""
hdrs = auth_headers(auth_token)
async with httpx.AsyncClient(base_url=backend_url, timeout=60.0) as client:
all_docs: list[dict] = []
page = 0
page_size = 200
while True:
resp = await client.get(
"/api/v1/documents",
headers=hdrs,
params={
"search_space_id": search_space_id,
"page": page,
"page_size": page_size,
},
)
if resp.status_code != 200:
break
body = resp.json()
all_docs.extend(body.get("items", []))
if not body.get("has_more", False):
break
page += 1
if not all_docs:
yield
return
in_flight = [
doc["id"]
for doc in all_docs
if doc.get("status", {}).get("state") in ("pending", "processing")
]
if in_flight:
with contextlib.suppress(Exception):
await poll_document_status(
client,
hdrs,
in_flight,
search_space_id=search_space_id,
timeout=120.0,
)
for doc in all_docs:
with contextlib.suppress(Exception):
await client.delete(
f"/api/v1/documents/{doc['id']}",
headers=hdrs,
)
deleted = await _force_delete_documents_db(search_space_id)
if deleted:
print(f"\n[purge] Deleted {deleted} stale document(s) from search space {search_space_id}")
yield
@ -134,13 +116,30 @@ def cleanup_doc_ids() -> list[int]:
async def _cleanup_documents(
client: httpx.AsyncClient,
headers: dict[str, str],
search_space_id: int,
cleanup_doc_ids: list[int],
):
"""
Runs after every test. Deletes any document IDs that were appended to
the ``cleanup_doc_ids`` list during the test body.
Runs after every test. Tries the API first for clean deletes, then
falls back to direct DB access for any stuck documents.
"""
yield
remaining_ids: list[int] = []
for doc_id in cleanup_doc_ids:
with contextlib.suppress(Exception):
await delete_document(client, headers, doc_id)
try:
resp = await delete_document(client, headers, doc_id)
if resp.status_code == 409:
remaining_ids.append(doc_id)
except Exception:
remaining_ids.append(doc_id)
if remaining_ids:
conn = await asyncpg.connect(DATABASE_URL)
try:
await conn.execute(
"DELETE FROM documents WHERE id = ANY($1::int[])",
remaining_ids,
)
finally:
await conn.close()