test(backend): add E2E harness foundation (entrypoints, middleware, LLM/embedding fakes)

This commit is contained in:
Anish Sarkar 2026-05-06 17:17:42 +05:30
parent c720866a67
commit 58ba95fad2
9 changed files with 550 additions and 0 deletions

View file

@ -0,0 +1,69 @@
# Backend E2E Test Harness
Strict fakes + alternative entrypoints used **only** by Playwright E2E.
Excluded from the production Docker image via `.dockerignore`.
## Files
| Path | Role |
| -------------------------------- | ------------------------------------------------------------------------------- |
| `run_backend.py` | FastAPI entrypoint that hijacks `sys.modules` before importing `app.app:app` |
| `run_celery.py` | Celery worker entrypoint with the same hijack + patch logic |
| `middleware/scenario.py` | `X-E2E-Scenario` header → ContextVar (read by fakes) |
| `fakes/composio_module.py` | Strict drop-in for the `composio` package; raises on unknown surface |
| `fakes/llm.py` | `fake_get_user_long_context_llm` returning a `FakeListChatModel` |
| `fakes/embeddings.py` | Deterministic 0.1-vector `embed_text` / `embed_texts` |
| `fakes/fixtures/drive_files.json`| Canned Drive listings + file contents (incl. canary tokens) |
## Why a sys.modules hijack?
Production code does `from composio import Composio` at module load
time. By the time the FastAPI app object exists, that binding has
already been resolved. The hijack runs **before** any `app.*` import,
so the binding resolves to our strict fake. No production source
changes; fakes are physically excluded from production images.
Belt + suspenders + no internet: the strict `__getattr__` in every
fake raises `NotImplementedError` if a future production code path
introduces a new SDK call. CI also sets `HTTPS_PROXY=http://127.0.0.1:1`
plus sentinel API keys so any leaked outbound HTTP fails immediately.
## Adding a new fake
1. Create `fakes/<sdk>_module.py` modelled on `composio_module.py`.
2. In `run_backend.py` and `run_celery.py`, register
`sys.modules["<sdk>"] = _fake_<sdk>` before the `from app.app import app`
line.
3. If the new fake needs scenario branching, read from
`tests.e2e.middleware.scenario.current_scenario()`.
## Reused by backend integration tests
The strict fakes are not only for Playwright. Backend route integration
tests can import the same fake before importing `app.app`, so Composio
route tests exercise production route code without touching the real
SDK:
```python
from tests.e2e.fakes import composio_module as _fake_composio
sys.modules["composio"] = _fake_composio
from app.app import app
```
See `surfsense_backend/tests/integration/composio/conftest.py` for the
current pattern.
## Running locally
```bash
cd surfsense_backend
uv run python tests/e2e/run_backend.py
# in a second shell:
uv run python tests/e2e/run_celery.py
```
Then in `surfsense_web`:
```bash
pnpm test:e2e
```

View file

@ -0,0 +1,7 @@
"""E2E test harness root.
This package is loaded only by the test entrypoints
(`tests/e2e/run_backend.py`, `tests/e2e/run_celery.py`). It is excluded
from the production Docker image via `surfsense_backend/.dockerignore`,
so production binaries never see this code.
"""

View file

@ -0,0 +1,8 @@
"""Strict fakes for third-party SDKs, used in E2E mode only.
Every fake here implements __getattr__ that raises NotImplementedError
on any unknown surface. Combined with sys.modules-level hijacking in
run_backend.py / run_celery.py, this makes silent pass-through to the
real SDK impossible: a future production code path that introduces a
new SDK call site fails CI with a clear "add this to the fake" message.
"""

View file

@ -0,0 +1,79 @@
"""Deterministic embedding fakes for E2E.
Mirrors the existing `patched_embed_texts` fixture in
`surfsense_backend/tests/integration/conftest.py`:
MagicMock(side_effect=lambda texts: [[0.1] * _EMBEDDING_DIM for _ in texts])
The dimension matches whatever `config.embedding_model_instance.dimension`
returns in the running process so the fakes are vector-compatible with
the documents.embedding pgvector column.
"""
from __future__ import annotations
import logging
from typing import Any
import numpy as np
from app.config import config
logger = logging.getLogger(__name__)
def _embedding_dim() -> int:
"""Resolve the dimension once, lazily, so tests work for any embedding model."""
return int(config.embedding_model_instance.dimension)
def fake_embed_text(text: str) -> np.ndarray:
"""Deterministic single-text embedding."""
return np.full(shape=(_embedding_dim(),), fill_value=0.1, dtype=np.float32)
def fake_embed_texts(texts: list[str]) -> list[np.ndarray]:
"""Deterministic batch embedding. One vector per input text."""
if not texts:
return []
dim = _embedding_dim()
return [
np.full(shape=(dim,), fill_value=0.1, dtype=np.float32) for _ in texts
]
def install(patches: list[Any]) -> None:
"""Install embedding patches at every binding site we know about.
Caller passes a `patches` list that the entrypoint will track in
order to start them (and, in principle, stop them on shutdown we
intentionally never stop because the process exits when the test
server stops).
"""
from unittest.mock import patch as _patch
targets = [
# Source binding (where the real implementation lives)
("app.utils.document_converters.embed_text", fake_embed_text),
("app.utils.document_converters.embed_texts", fake_embed_texts),
# Consumers that did `from app.utils.document_converters import embed_text/texts`
("app.indexing_pipeline.document_embedder.embed_text", fake_embed_text),
("app.indexing_pipeline.document_embedder.embed_texts", fake_embed_texts),
# Pipeline service binding (the actual call site for indexing.index)
("app.indexing_pipeline.indexing_pipeline_service.embed_texts", fake_embed_texts),
]
for target, replacement in targets:
try:
p = _patch(target, replacement)
p.start()
patches.append(p)
logger.info("[fake-embeddings] patched %s", target)
except (ModuleNotFoundError, AttributeError) as exc:
# If a future refactor moves a binding, fail loudly — silent
# passthrough to a real embedding model would be expensive
# and non-deterministic.
raise RuntimeError(
f"Could not patch embedding binding {target!r}: {exc!s}. "
f"Update surfsense_backend/tests/e2e/fakes/embeddings.py "
f"to point at the new binding site."
) from exc

View file

@ -0,0 +1,48 @@
"""Deterministic LLM fake for the E2E indexing pipeline.
The production indexing pipeline summarizes documents with:
summary_chain = SUMMARY_PROMPT_TEMPLATE | llm
summary_result = await summary_chain.ainvoke({"document": ...})
summary_content = summary_result.content
The `llm` parameter is supplied per-document by
`app.services.llm_service.get_user_long_context_llm`. We patch THAT
function to return a langchain-native FakeListChatModel so the rest of
the chain works unchanged. No real LLM provider package is touched.
Run-backend / run-celery use unittest.mock.patch.start() to install
this at every binding site (the source module + every consumer that
did `from app.services.llm_service import get_user_long_context_llm`
at module load time).
"""
from __future__ import annotations
import logging
from typing import Any
from langchain_core.language_models.fake_chat_models import FakeListChatModel
logger = logging.getLogger(__name__)
def _make_fake_llm() -> FakeListChatModel:
"""Build a fresh FakeListChatModel that returns a deterministic summary."""
# FakeListChatModel cycles through `responses` for each invocation. We
# supply a single deterministic string. The summary content is tagged
# with a marker that specs CAN assert on if they want, but the
# primary indexing assertion is on the file content (chunked + stored
# separately by the pipeline).
fake = FakeListChatModel(
responses=[
"E2E_FAKE_SUMMARY: Indexed by Playwright E2E run with deterministic LLM stub."
]
)
return fake
async def fake_get_user_long_context_llm(*args: Any, **kwargs: Any) -> Any:
"""Drop-in replacement for app.services.llm_service.get_user_long_context_llm."""
logger.info("[fake-llm] returning FakeListChatModel for E2E indexing")
return _make_fake_llm()

View file

@ -0,0 +1,4 @@
"""Test-only middleware. Mounted on the FastAPI `app` object inside
`tests/e2e/run_backend.py`, never registered by production startup
(`python main.py`).
"""

View file

@ -0,0 +1,54 @@
"""X-E2E-Scenario middleware.
Reads the X-E2E-Scenario request header and pipes the value into a
ContextVar that the strict fakes consult to switch between happy-path
and error scenarios on a per-request basis.
Mounted by tests/e2e/run_backend.py only. Production never adds this
middleware, so production never reads the header.
Supported scenarios:
- "happy" (default): everything succeeds with deterministic fixtures.
- "denied": Composio.connected_accounts.initiate returns a redirect URL
pointing at our callback with ?error=access_denied.
- "auth_expired": GOOGLEDRIVE_LIST_FILES returns an authentication
failure that the route translates to connector.config.auth_expired.
- "duplicate": no special fake behavior; the duplicate path is exercised
by running the OAuth flow twice with the same toolkit.
"""
from __future__ import annotations
from contextvars import ContextVar
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
from starlette.types import ASGIApp
_scenario: ContextVar[str] = ContextVar("e2e_scenario", default="happy")
def current_scenario() -> str:
"""Return the active E2E scenario for the current request context."""
return _scenario.get()
class ScenarioMiddleware(BaseHTTPMiddleware):
"""Reads X-E2E-Scenario and exposes it via a ContextVar.
The header is also forwarded as state on the request so route
handlers can branch if they ever need to (Composio routes do not).
"""
def __init__(self, app: ASGIApp) -> None:
super().__init__(app)
async def dispatch(self, request: Request, call_next) -> Response:
value = request.headers.get("X-E2E-Scenario", "happy")
token = _scenario.set(value)
try:
request.state.e2e_scenario = value
return await call_next(request)
finally:
_scenario.reset(token)

View file

@ -0,0 +1,156 @@
"""E2E backend entrypoint.
Hijacks third-party SDKs at sys.modules level BEFORE any production
code is imported, then starts the same FastAPI app + uvicorn that
`main.py` would run.
Production code is byte-identical with or without this file:
- `python main.py` is the production entrypoint (unchanged).
- `python tests/e2e/run_backend.py` is the test entrypoint, never imported by production.
- `surfsense_backend/.dockerignore` excludes `tests/`, so this file
physically does not exist in the production Docker image.
Defense in depth (see Composio Drive E2E Phase 1 plan):
1. sys.modules hijack here (Composio).
2. Strict __getattr__ inside fakes (NotImplementedError on unknown surface).
3. Network deny-list set in CI env (HTTPS_PROXY=http://127.0.0.1:1
plus sentinel API keys) so any leaked outbound HTTP fails loudly.
Usage:
cd surfsense_backend
uv run python tests/e2e/run_backend.py
"""
from __future__ import annotations
import logging
import os
import sys
# ---------------------------------------------------------------------------
# 1) Hijack sys.modules BEFORE any production import.
# Production: composio_service.py:11 does `from composio import Composio`.
# With this hijack in place, that import resolves to our strict fake.
# ---------------------------------------------------------------------------
# Make the surfsense_backend root importable as a top-level package so
# `import tests.e2e.fakes...` works regardless of how the entrypoint is
# invoked (uv run python tests/e2e/run_backend.py from repo root or from
# surfsense_backend/).
_THIS_DIR = os.path.dirname(os.path.abspath(__file__))
_BACKEND_ROOT = os.path.abspath(os.path.join(_THIS_DIR, "..", ".."))
if _BACKEND_ROOT not in sys.path:
sys.path.insert(0, _BACKEND_ROOT)
import tests.e2e.fakes.composio_module as _fake_composio # noqa: E402
sys.modules["composio"] = _fake_composio
# ---------------------------------------------------------------------------
# 2) Standard logging + dotenv so the rest of the app behaves like main.py.
# ---------------------------------------------------------------------------
from dotenv import load_dotenv # noqa: E402
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("surfsense.e2e.backend")
logger.warning(
"*** SURFSENSE E2E BACKEND ENTRYPOINT — fake Composio + LLM + embeddings, "
"this MUST NOT be reachable in production. ***"
)
# ---------------------------------------------------------------------------
# 3) Now import the production app. Every module in app.* loads here,
# creating their bindings (some of which we will patch in step 4).
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# 4) Patch LLM + embedding bindings at every consumer site.
# Composio is already covered by the sys.modules hijack in step 1.
# ---------------------------------------------------------------------------
from unittest.mock import patch # noqa: E402
from app.app import app # noqa: E402
from tests.e2e.fakes import embeddings as _fake_embeddings # noqa: E402
from tests.e2e.fakes.llm import fake_get_user_long_context_llm # noqa: E402
_active_patches: list = []
def _patch_llm_bindings() -> None:
"""Replace get_user_long_context_llm at every known binding site."""
targets = [
"app.services.llm_service.get_user_long_context_llm",
"app.tasks.connector_indexers.google_drive_indexer.get_user_long_context_llm",
"app.tasks.connector_indexers.google_gmail_indexer.get_user_long_context_llm",
"app.tasks.connector_indexers.local_folder_indexer.get_user_long_context_llm",
"app.tasks.document_processors.file_processors.get_user_long_context_llm",
]
for target in targets:
try:
p = patch(target, fake_get_user_long_context_llm)
p.start()
_active_patches.append(p)
logger.info("[fake-llm] patched %s", target)
except (ModuleNotFoundError, AttributeError) as exc:
# Some indexers may not be loaded in every env. Log and move
# on — but do not silently let a known binding through.
logger.warning(
"[fake-llm] could not patch %s: %s. If production code "
"uses this path in E2E it will hit the real provider; "
"update tests/e2e/run_backend.py.",
target,
exc,
)
_patch_llm_bindings()
_fake_embeddings.install(_active_patches)
# ---------------------------------------------------------------------------
# 5) Mount test-only middleware. Production never reaches this code.
# ---------------------------------------------------------------------------
from tests.e2e.middleware.scenario import ScenarioMiddleware # noqa: E402
app.add_middleware(ScenarioMiddleware)
# ---------------------------------------------------------------------------
# 6) Start uvicorn, mirroring main.py's behaviour.
# ---------------------------------------------------------------------------
import asyncio
import uvicorn
def _main() -> None:
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
host = os.getenv("UVICORN_HOST", "0.0.0.0")
port = int(os.getenv("UVICORN_PORT", "8000"))
log_level = os.getenv("UVICORN_LOG_LEVEL", "info")
config = uvicorn.Config(
app=app,
host=host,
port=port,
log_level=log_level,
reload=False,
)
server = uvicorn.Server(config)
server.run()
if __name__ == "__main__":
_main()

View file

@ -0,0 +1,125 @@
"""E2E Celery worker entrypoint.
Same sys.modules hijack + LLM/embedding patches as run_backend.py,
applied before importing the production celery_app. Celery workers
run in a separate Python interpreter, so the patches must be applied
here too they would NOT carry over from the FastAPI process.
Production is unaffected: celery_worker.py at the repo root is the
production entrypoint and never imports this file.
Usage:
cd surfsense_backend
uv run python tests/e2e/run_celery.py
"""
from __future__ import annotations
import logging
import os
import sys
_THIS_DIR = os.path.dirname(os.path.abspath(__file__))
_BACKEND_ROOT = os.path.abspath(os.path.join(_THIS_DIR, "..", ".."))
if _BACKEND_ROOT not in sys.path:
sys.path.insert(0, _BACKEND_ROOT)
# ---------------------------------------------------------------------------
# 1) Hijack sys.modules BEFORE production celery imports anything.
# ---------------------------------------------------------------------------
import tests.e2e.fakes.composio_module as _fake_composio # noqa: E402
sys.modules["composio"] = _fake_composio
# ---------------------------------------------------------------------------
# 2) Logging + dotenv.
# ---------------------------------------------------------------------------
from dotenv import load_dotenv # noqa: E402
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("surfsense.e2e.celery")
logger.warning(
"*** SURFSENSE E2E CELERY WORKER — fake Composio + LLM + embeddings, "
"this MUST NOT be reachable in production. ***"
)
# ---------------------------------------------------------------------------
# 3) Import the production celery_app. All task modules load here.
# ---------------------------------------------------------------------------
from app.celery_app import celery_app # noqa: E402
# ---------------------------------------------------------------------------
# 4) Patch LLM + embedding bindings inside the worker process.
# ---------------------------------------------------------------------------
from unittest.mock import patch # noqa: E402
from tests.e2e.fakes import embeddings as _fake_embeddings # noqa: E402
from tests.e2e.fakes.llm import fake_get_user_long_context_llm # noqa: E402
_active_patches: list = []
def _patch_llm_bindings() -> None:
targets = [
"app.services.llm_service.get_user_long_context_llm",
"app.tasks.connector_indexers.google_drive_indexer.get_user_long_context_llm",
"app.tasks.connector_indexers.google_gmail_indexer.get_user_long_context_llm",
"app.tasks.connector_indexers.local_folder_indexer.get_user_long_context_llm",
"app.tasks.document_processors.file_processors.get_user_long_context_llm",
]
for target in targets:
try:
p = patch(target, fake_get_user_long_context_llm)
p.start()
_active_patches.append(p)
logger.info("[fake-llm] patched %s in celery worker", target)
except (ModuleNotFoundError, AttributeError) as exc:
logger.warning(
"[fake-llm] could not patch %s in celery worker: %s.",
target,
exc,
)
_patch_llm_bindings()
_fake_embeddings.install(_active_patches)
# ---------------------------------------------------------------------------
# 5) Start the worker.
# ---------------------------------------------------------------------------
def _main() -> None:
# Default queues mirror production (default queue + connectors queue
# so Drive indexing tasks are picked up).
queue_name = os.getenv("CELERY_TASK_DEFAULT_QUEUE", "surfsense")
queues = f"{queue_name},{queue_name}.connectors"
celery_app.worker_main(
argv=[
"worker",
"--loglevel=info",
f"--queues={queues}",
"--concurrency=2",
"--without-gossip",
"--without-mingle",
]
)
if __name__ == "__main__":
_main()