test: add notifications unit tests and conventions doc

This commit is contained in:
CREDO23 2026-06-03 21:53:06 +02:00
parent 3f770203ca
commit 1165b3ad5c
7 changed files with 512 additions and 0 deletions

View file

@ -0,0 +1,62 @@
# Tests
How the backend test suite is organized and the conventions to follow when adding tests.
## Layout: type-first, module-mirrored
Tests are split by **type** at the top level, and each type **mirrors the `app/` module tree** inside:
```
tests/
├── conftest.py # global fixtures + DATABASE_URL pinning
├── unit/ # pure logic: no DB, no app, no network
│ └── notifications/
│ ├── api/test_transform.py
│ └── service/
│ ├── messages/test_connector_indexing.py
│ └── test_metadata.py
└── integration/ # real PostgreSQL (pgvector)
├── conftest.py # async engine, transactional db_session, db_user, ...
└── notifications/
├── conftest.py # module-scoped fixtures (e.g. transactional client)
└── test_*_handler.py
```
To find a feature's tests, look under `tests/<type>/<same path as app/>`.
## Unit vs integration
- `@pytest.mark.unit` — pure, fast, no I/O. Test behavior through a public function's inputs/outputs.
- `@pytest.mark.integration` — requires a real database. Run with `AUTH_TYPE=LOCAL`.
Maximize logic covered by unit tests; keep integration tests for what genuinely needs the DB (persistence, SQL filters, scoping, HTTP wiring).
## Principles
- **Behavior, not implementation.** Assert observable outputs (returned values, persisted rows, HTTP responses), never private helpers. Tests should survive a refactor.
- **Functional core / imperative shell.** Put pure decision logic in a side-effect-free module (e.g. `app/notifications/service/messages/`) so it is unit-testable; keep the persistence shell thin and cover it with a few integration tests.
- **One responsibility per test file**, mirroring the slice it covers.
- **Mock only at system boundaries** (external APIs, brokers), never internal collaborators. Prefer dependency overrides and the transactional `db_session` over mocks.
## Fixtures
`conftest.py` is scoped to its directory and below. Keep truly global fixtures in `tests/conftest.py`; put module-specific fixtures in that module's `conftest.py` so a DB fixture never loads for a pure unit test.
For API integration tests, override `get_async_session` and `current_active_user` to ride the test's transactional `db_session` (see `tests/integration/notifications/conftest.py`): rows seeded in the test and rows read via the endpoint share one transaction that rolls back automatically.
## Import mode
The suite uses `--import-mode=importlib` with `pythonpath = ["."]` (see `pyproject.toml`). This lets test files share basenames across modules (e.g. many `test_api.py`) without `__init__.py` boilerplate; new test directories do not need an `__init__.py`.
## Running
```bash
# fast unit tests
uv run pytest -m unit
# integration (needs Postgres + pgvector)
AUTH_TYPE=LOCAL uv run pytest -m integration
# a single module's tests
uv run pytest tests/unit/notifications
```

View file

@ -0,0 +1,94 @@
"""Unit tests for pure notifications API request/response helpers."""
from __future__ import annotations
import uuid
from datetime import UTC, datetime
import pytest
from app.notifications.api.transform import (
parse_before_date,
parse_source_type,
to_response,
)
from app.notifications.persistence import Notification
pytestmark = pytest.mark.unit
class TestParseSourceType:
def test_connector_prefix(self):
"""A 'connector:' filter selects the connector types and JSONB facet."""
parsed = parse_source_type("connector:GITHUB_CONNECTOR")
assert parsed.types == ("connector_indexing", "connector_deletion")
assert parsed.metadata_key == "connector_type"
assert parsed.value == "GITHUB_CONNECTOR"
def test_doctype_prefix(self):
"""A 'doctype:' filter selects the document type and JSONB facet."""
parsed = parse_source_type("doctype:FILE")
assert parsed.types == ("document_processing",)
assert parsed.metadata_key == "document_type"
assert parsed.value == "FILE"
def test_unknown_prefix_returns_none(self):
"""An unrecognized prefix yields no filter."""
assert parse_source_type("mystery:thing") is None
class TestParseBeforeDate:
def test_parses_iso_with_zulu(self):
"""An ISO date with a 'Z' suffix parses to a UTC datetime."""
parsed = parse_before_date("2024-01-15T00:00:00Z")
assert parsed == datetime(2024, 1, 15, tzinfo=UTC)
def test_invalid_raises_value_error(self):
"""A malformed date raises ValueError for the endpoint to turn into a 400."""
with pytest.raises(ValueError):
parse_before_date("not-a-date")
def _notification(**overrides) -> Notification:
defaults = dict(
id=1,
user_id=uuid.uuid4(),
search_space_id=3,
type="document_processing",
title="Title",
message="Message",
read=False,
notification_metadata={"k": "v"},
created_at=datetime(2024, 1, 1, tzinfo=UTC),
updated_at=datetime(2024, 1, 2, tzinfo=UTC),
)
defaults.update(overrides)
return Notification(**defaults)
class TestToResponse:
def test_maps_core_fields(self):
"""A persisted notification maps its core fields onto the response shape."""
notification = _notification()
response = to_response(notification)
assert response.id == 1
assert response.user_id == str(notification.user_id)
assert response.type == "document_processing"
assert response.metadata == {"k": "v"}
assert response.created_at == "2024-01-01T00:00:00+00:00"
assert response.updated_at == "2024-01-02T00:00:00+00:00"
def test_missing_updated_at_maps_to_none(self):
"""A missing updated_at is represented as None in the response."""
response = to_response(_notification(updated_at=None))
assert response.updated_at is None
def test_missing_created_at_maps_to_empty_string(self):
"""A missing created_at is represented as an empty string in the response."""
response = to_response(_notification(created_at=None))
assert response.created_at == ""
def test_null_metadata_maps_to_empty_dict(self):
"""Null metadata is normalized to an empty dict in the response."""
response = to_response(_notification(notification_metadata=None))
assert response.metadata == {}

View file

@ -0,0 +1,176 @@
"""Unit tests for connector-indexing presentation logic."""
from __future__ import annotations
import pytest
from app.notifications.service.messages import connector_indexing as msg
pytestmark = pytest.mark.unit
class TestOperationId:
def test_encodes_connector_id(self):
"""The operation id embeds the connector id."""
assert msg.operation_id(42).startswith("connector_42_")
def test_appends_date_range_when_given(self):
"""A start/end date range is appended to the operation id."""
op = msg.operation_id(42, start_date="2024-01-01", end_date="2024-02-01")
assert op.endswith("_2024-01-01_2024-02-01")
def test_uses_none_placeholder_for_open_ended_range(self):
"""A missing range bound is encoded as the 'none' placeholder."""
assert msg.operation_id(42, start_date="2024-01-01").endswith(
"_2024-01-01_none"
)
def test_google_drive_encodes_counts(self):
"""The Drive operation id embeds connector id plus folder/file counts."""
op = msg.google_drive_operation_id(7, folder_count=2, file_count=5)
assert op.startswith("drive_7_")
assert op.endswith("_2f_5files")
class TestProgress:
def test_known_stage_maps_to_message(self):
"""A known stage maps to its user-facing message and is recorded."""
message, meta = msg.progress(3, stage="fetching")
assert message == "Fetching your content"
assert meta["indexed_count"] == 3
assert meta["sync_stage"] == "fetching"
def test_unknown_stage_falls_back_to_processing(self):
"""An unrecognized stage falls back to a generic 'Processing' message."""
message, _ = msg.progress(1, stage="weird")
assert message == "Processing"
def test_stage_message_overrides_mapping(self):
"""An explicit stage message overrides the stage-to-message mapping."""
message, _ = msg.progress(1, stage="fetching", stage_message="Custom")
assert message == "Custom"
def test_no_stage_uses_legacy_default(self):
"""With neither stage nor message, the legacy default message is used."""
message, meta = msg.progress(1)
assert message == "Fetching your content"
assert "sync_stage" not in meta
def test_total_count_yields_percent(self):
"""Supplying a total count produces a progress percentage."""
_, meta = msg.progress(5, total_count=10)
assert meta["total_count"] == 10
assert meta["progress_percent"] == 50
class TestRetry:
def test_strips_workspace_suffix_from_connector_name(self):
"""The provider name is derived by stripping the workspace suffix."""
message, _ = msg.retry("Notion - My Workspace", 0, "rate_limit", 1, 3)
assert message == "Notion rate limit reached. Retrying..."
def test_explicit_service_name_wins(self):
"""An explicit service name overrides the connector-derived name."""
message, _ = msg.retry(
"Notion - WS", 0, "rate_limit", 1, 3, service_name="Slack"
)
assert message.startswith("Slack rate limit reached")
@pytest.mark.parametrize(
("reason", "expected"),
[
("rate_limit", "Notion rate limit reached"),
("server_error", "Notion is slow to respond"),
("timeout", "Notion took too long"),
("temporary_error", "Notion temporarily unavailable"),
("something_else", "Waiting for Notion"),
],
)
def test_reason_wording(self, reason, expected):
"""Each retry reason maps to its wording; unknown reasons get a fallback."""
message, _ = msg.retry("Notion", 0, reason, 1, 3)
assert message.startswith(expected)
def test_long_wait_shows_seconds(self):
"""A wait longer than the threshold surfaces the retry delay in seconds."""
message, _ = msg.retry("Notion", 0, "rate_limit", 1, 3, wait_seconds=10)
assert "Retrying in 10s..." in message
def test_short_wait_is_hidden(self):
"""A short wait is not worth showing, so no seconds are surfaced."""
message, _ = msg.retry("Notion", 0, "rate_limit", 1, 3, wait_seconds=3)
assert message.endswith("Retrying...")
def test_synced_count_suffix_singular_and_plural(self):
"""Already-synced items are appended with correct singular/plural wording."""
one, _ = msg.retry("Notion", 1, "rate_limit", 1, 3)
many, _ = msg.retry("Notion", 2, "rate_limit", 1, 3)
assert one.endswith("(1 item synced so far)")
assert many.endswith("(2 items synced so far)")
def test_metadata_records_retry_state(self):
"""Retry metadata captures the attempt, reason, and wait state."""
_, meta = msg.retry("Notion", 0, "rate_limit", 2, 5, wait_seconds=8)
assert meta["sync_stage"] == "waiting_retry"
assert meta["retry_attempt"] == 2
assert meta["retry_max_attempts"] == 5
assert meta["retry_reason"] == "rate_limit"
assert meta["retry_wait_seconds"] == 8
class TestCompletion:
def test_clean_success_plural(self):
"""A clean multi-file sync reports ready/completed with plural wording."""
title, message, status, meta = msg.completion("GitHub", 3)
assert title == "Ready: GitHub"
assert message == "Now searchable! 3 files synced."
assert status == "completed"
assert meta["sync_stage"] == "completed"
def test_clean_success_singular(self):
"""A single synced file uses singular 'file' wording."""
_, message, _, _ = msg.completion("GitHub", 1)
assert message == "Now searchable! 1 file synced."
def test_nothing_to_sync(self):
"""Zero new items with no error reports 'Already up to date!'."""
_, message, status, _ = msg.completion("GitHub", 0)
assert message == "Already up to date!"
assert status == "completed"
def test_hard_failure(self):
"""An error with nothing synced reports a hard failure."""
title, message, status, meta = msg.completion("GitHub", 0, error_message="boom")
assert title == "Failed: GitHub"
assert message == "Sync failed: boom"
assert status == "failed"
assert meta["sync_stage"] == "failed"
def test_partial_success_with_error_note(self):
"""An error after partial progress still completes, with an appended note."""
title, message, status, _ = msg.completion("GitHub", 2, error_message="flaky")
assert title == "Ready: GitHub"
assert message == "Now searchable! 2 files synced. Note: flaky"
assert status == "completed"
def test_warning_is_treated_as_complete(self):
"""A warning-level error completes the run rather than failing it."""
title, message, status, _ = msg.completion(
"GitHub", 0, error_message="partial", is_warning=True
)
assert title == "Ready: GitHub"
assert message == "Sync complete. partial"
assert status == "completed"
def test_unsupported_files_note_singular_and_plural(self):
"""Unsupported-file counts are described with correct singular/plural wording."""
_, one, _, _ = msg.completion("GitHub", 2, unsupported_count=1)
_, many, _, _ = msg.completion("GitHub", 2, unsupported_count=3)
assert "1 file was not supported." in one
assert "3 files were not supported." in many
def test_zero_indexed_with_unsupported_reports_complete(self):
"""Nothing synced but some unsupported files still reports completion."""
_, message, status, _ = msg.completion("GitHub", 0, unsupported_count=2)
assert message == "Sync complete. 2 files were not supported."
assert status == "completed"

View file

@ -0,0 +1,63 @@
"""Unit tests for document-processing presentation logic."""
from __future__ import annotations
import pytest
from app.notifications.service.messages import document_processing as msg
pytestmark = pytest.mark.unit
def test_operation_id_encodes_type_and_space():
"""The operation id embeds the document type and search space id."""
op = msg.operation_id("FILE", "report.pdf", 9)
assert op.startswith("doc_FILE_9_")
@pytest.mark.parametrize(
("stage", "expected"),
[
("parsing", "Reading your file"),
("chunking", "Preparing for search"),
("embedding", "Preparing for search"),
("storing", "Finalizing"),
("unknown", "Processing"),
],
)
def test_progress_stage_messages(stage, expected):
"""Each processing stage maps to its message; unknown stages get a fallback."""
message, meta = msg.progress(stage)
assert message == expected
assert meta["processing_stage"] == stage
def test_progress_records_chunks_count():
"""A provided chunk count is stored in metadata for debugging."""
_, meta = msg.progress("chunking", chunks_count=12)
assert meta["chunks_count"] == 12
def test_progress_message_override():
"""An explicit stage message overrides the stage mapping."""
message, _ = msg.progress("parsing", stage_message="Scanning")
assert message == "Scanning"
def test_completion_success():
"""A successful run reports ready/completed and records the document id."""
title, message, status, meta = msg.completion("report.pdf", document_id=5)
assert title == "Ready: report.pdf"
assert message == "Now searchable!"
assert status == "completed"
assert meta["document_id"] == 5
assert meta["processing_stage"] == "completed"
def test_completion_failure():
"""An error reports failed status with the error surfaced in the message."""
title, message, status, meta = msg.completion("report.pdf", error_message="bad")
assert title == "Failed: report.pdf"
assert message == "Processing failed: bad"
assert status == "failed"
assert meta["processing_stage"] == "failed"

View file

@ -0,0 +1,30 @@
"""Unit tests for page-limit presentation logic."""
from __future__ import annotations
import pytest
from app.notifications.service.messages import page_limit as msg
pytestmark = pytest.mark.unit
def test_operation_id_encodes_search_space():
"""The operation id embeds the search space id."""
assert msg.operation_id("doc.pdf", 9).startswith("page_limit_9_")
def test_summary_title_and_message():
"""The summary states the document and the used/limit page counts."""
title, message = msg.summary("short.pdf", pages_used=95, pages_limit=100, pages_to_add=10)
assert title == "Page limit exceeded: short.pdf"
assert message == (
"This document has ~10 page(s) but you've used 95/100 pages. "
"Upgrade to process more documents."
)
def test_summary_truncates_long_name():
"""A long document name is truncated in the title."""
title, _ = msg.summary("a" * 50, pages_used=1, pages_limit=2, pages_to_add=1)
assert title == f"Page limit exceeded: {'a' * 40}..."

View file

@ -0,0 +1,24 @@
"""Unit tests for shared notification text helpers."""
from __future__ import annotations
import pytest
from app.notifications.service.messages.text import truncate
pytestmark = pytest.mark.unit
def test_truncate_leaves_short_text_unchanged():
"""Text under the limit is returned verbatim, with no ellipsis."""
assert truncate("hello", 100) == "hello"
def test_truncate_keeps_text_at_exact_limit():
"""Text exactly at the limit is not truncated."""
assert truncate("a" * 40, 40) == "a" * 40
def test_truncate_appends_ellipsis_when_over_limit():
"""Text past the limit is cut to the limit and gains an ellipsis."""
assert truncate("a" * 41, 40) == "a" * 40 + "..."

View file

@ -0,0 +1,63 @@
"""Unit tests for pure notification metadata transitions."""
from __future__ import annotations
import pytest
from app.notifications.service.metadata import apply_update, start_metadata
pytestmark = pytest.mark.unit
class TestStartMetadata:
def test_seeds_operation_and_progress_fields(self):
"""A new notification is seeded with operation id, in-progress status, and start time."""
meta = start_metadata("op-1")
assert meta["operation_id"] == "op-1"
assert meta["status"] == "in_progress"
assert "started_at" in meta
def test_preserves_initial_fields(self):
"""Caller-provided initial metadata is carried through."""
meta = start_metadata("op-1", {"connector_id": 7})
assert meta["connector_id"] == 7
def test_does_not_mutate_caller_dict(self):
"""Seeding returns a new dict without mutating the caller's input."""
initial = {"connector_id": 7}
start_metadata("op-1", initial)
assert initial == {"connector_id": 7}
class TestApplyUpdate:
def test_completed_stamps_completed_at(self):
"""A completed status records a completion timestamp."""
meta = apply_update({"status": "in_progress"}, status="completed")
assert meta["status"] == "completed"
assert "completed_at" in meta
def test_failed_stamps_completed_at(self):
"""A failed status also records a completion timestamp."""
meta = apply_update({}, status="failed")
assert "completed_at" in meta
def test_in_progress_does_not_stamp_completed_at(self):
"""A non-terminal status leaves the completion timestamp unset."""
meta = apply_update({}, status="in_progress")
assert "completed_at" not in meta
def test_merges_metadata_updates(self):
"""Metadata updates are merged into the existing metadata."""
meta = apply_update({"a": 1}, metadata_updates={"b": 2})
assert meta == {"a": 1, "b": 2}
def test_updates_override_existing_keys(self):
"""Updates take precedence over existing keys on conflict."""
meta = apply_update({"a": 1}, metadata_updates={"a": 9})
assert meta["a"] == 9
def test_does_not_mutate_caller_dict(self):
"""Applying updates returns a new dict without mutating the caller's input."""
current = {"a": 1}
apply_update(current, status="completed", metadata_updates={"b": 2})
assert current == {"a": 1}