test: add notifications integration behavior guard

This commit is contained in:
CREDO23 2026-06-03 21:53:06 +02:00
parent 339ec31cad
commit 3f770203ca
8 changed files with 938 additions and 0 deletions

View file

@ -0,0 +1,53 @@
"""Notifications integration fixtures.
The app's DB session and current-user dependencies are overridden to ride the
test's transactional `db_session`, so API calls and seeded rows share one
transaction that rolls back per test. Overriding `current_active_user` also
bypasses real JWT auth, so these tests don't depend on AUTH_TYPE.
"""
from __future__ import annotations
from collections.abc import AsyncGenerator
import httpx
import pytest
import pytest_asyncio
from httpx import ASGITransport
from sqlalchemy.ext.asyncio import AsyncSession
from app.app import app, limiter
from app.db import User, get_async_session
from app.users import current_active_user
pytestmark = pytest.mark.integration
limiter.enabled = False
@pytest_asyncio.fixture
async def client(
db_session: AsyncSession,
db_user: User,
) -> AsyncGenerator[httpx.AsyncClient, None]:
async def override_session() -> AsyncGenerator[AsyncSession, None]:
yield db_session
async def override_user() -> User:
return db_user
previous_overrides = app.dependency_overrides.copy()
app.dependency_overrides[get_async_session] = override_session
app.dependency_overrides[current_active_user] = override_user
try:
async with httpx.AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
timeout=30.0,
follow_redirects=False,
) as test_client:
yield test_client
finally:
app.dependency_overrides.clear()
app.dependency_overrides.update(previous_overrides)

View file

@ -0,0 +1,164 @@
"""Behavior guard for the shared find/upsert/update logic (BaseNotificationHandler).
Uses the connector-indexing handler instance to drive the base methods against
real Postgres, pinning upsert dedup, search-space scoping, and status stamping.
"""
from __future__ import annotations
import pytest
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSpace, User
from app.notifications.persistence import Notification
from app.notifications.service import NotificationService
pytestmark = pytest.mark.integration
handler = NotificationService.connector_indexing
async def test_find_or_create_creates_with_progress_metadata(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""Creating a notification seeds operation id, in-progress status, and start time."""
notification = await handler.find_or_create_notification(
session=db_session,
user_id=db_user.id,
operation_id="op-create",
title="Title",
message="Message",
search_space_id=db_search_space.id,
)
assert notification.notification_metadata["operation_id"] == "op-create"
assert notification.notification_metadata["status"] == "in_progress"
assert "started_at" in notification.notification_metadata
async def test_find_or_create_upserts_same_operation(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""Reusing an operation id updates the same row instead of creating a duplicate."""
first = await handler.find_or_create_notification(
session=db_session,
user_id=db_user.id,
operation_id="op-upsert",
title="First",
message="First message",
search_space_id=db_search_space.id,
)
second = await handler.find_or_create_notification(
session=db_session,
user_id=db_user.id,
operation_id="op-upsert",
title="Second",
message="Second message",
search_space_id=db_search_space.id,
)
assert second.id == first.id
assert second.title == "Second"
assert second.message == "Second message"
count = await db_session.scalar(
select(func.count(Notification.id)).where(
Notification.user_id == db_user.id,
Notification.notification_metadata["operation_id"].astext == "op-upsert",
)
)
assert count == 1
async def test_find_by_operation_is_scoped_to_search_space(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""Operation-id lookup is scoped per search space, so other spaces don't match."""
await handler.find_or_create_notification(
session=db_session,
user_id=db_user.id,
operation_id="op-scoped",
title="Title",
message="Message",
search_space_id=db_search_space.id,
)
other_space = SearchSpace(name="Other Space", user_id=db_user.id)
db_session.add(other_space)
await db_session.flush()
found_other = await handler.find_notification_by_operation(
session=db_session,
user_id=db_user.id,
operation_id="op-scoped",
search_space_id=other_space.id,
)
assert found_other is None
found_same = await handler.find_notification_by_operation(
session=db_session,
user_id=db_user.id,
operation_id="op-scoped",
search_space_id=db_search_space.id,
)
assert found_same is not None
async def test_update_notification_completed_stamps_completed_at(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""Completing a notification stamps completed_at and merges metadata updates."""
notification = await handler.find_or_create_notification(
session=db_session,
user_id=db_user.id,
operation_id="op-complete",
title="Title",
message="Message",
search_space_id=db_search_space.id,
)
updated = await handler.update_notification(
session=db_session,
notification=notification,
status="completed",
metadata_updates={"indexed_count": 7},
)
assert updated.notification_metadata["status"] == "completed"
assert "completed_at" in updated.notification_metadata
assert updated.notification_metadata["indexed_count"] == 7
async def test_update_notification_failed_stamps_completed_at(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""Failing a notification also stamps completed_at for the terminal state."""
notification = await handler.find_or_create_notification(
session=db_session,
user_id=db_user.id,
operation_id="op-fail",
title="Title",
message="Message",
search_space_id=db_search_space.id,
)
updated = await handler.update_notification(
session=db_session,
notification=notification,
status="failed",
)
assert updated.notification_metadata["status"] == "failed"
assert "completed_at" in updated.notification_metadata

View file

@ -0,0 +1,62 @@
"""Behavior guard for the comment-reply notification handler."""
from __future__ import annotations
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSpace, User
from app.notifications.service import NotificationService
pytestmark = pytest.mark.integration
handler = NotificationService.comment_reply
async def _notify(db_session, db_user, db_search_space, *, reply_id=1, preview="hi"):
"""Raise a comment-reply notification for the assertions in the tests below."""
return await handler.notify_comment_reply(
session=db_session,
user_id=db_user.id,
reply_id=reply_id,
parent_comment_id=10,
message_id=20,
thread_id=30,
thread_title="Thread",
author_id="author-1",
author_name="Bob",
author_avatar_url=None,
author_email="bob@surfsense.net",
content_preview=preview,
search_space_id=db_search_space.id,
)
async def test_comment_reply_title_and_message(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""A reply notification names the author and carries the comment preview."""
notification = await _notify(db_session, db_user, db_search_space, preview="thanks")
assert notification.type == "comment_reply"
assert notification.title == "Bob replied in a thread"
assert notification.message == "thanks"
async def test_comment_reply_truncates_long_preview(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""A long comment preview is truncated in the reply message."""
notification = await _notify(db_session, db_user, db_search_space, preview="y" * 150)
assert notification.message == "y" * 100 + "..."
async def test_comment_reply_is_idempotent(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""Re-notifying the same reply id reuses the existing notification row."""
first = await _notify(db_session, db_user, db_search_space, reply_id=5)
second = await _notify(db_session, db_user, db_search_space, reply_id=5)
assert second.id == first.id

View file

@ -0,0 +1,235 @@
"""Behavior guard for the connector-indexing notification handler.
Exercises the real handler against Postgres via the transactional db_session,
pinning the title/message/status/metadata it produces so the upcoming
functional-core extraction cannot drift.
"""
from __future__ import annotations
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSpace, User
from app.notifications.service import NotificationService
pytestmark = pytest.mark.integration
async def test_indexing_started_opens_notification(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""Starting indexing opens an unread notification with connecting-stage metadata."""
notification = await NotificationService.connector_indexing.notify_indexing_started(
session=db_session,
user_id=db_user.id,
connector_id=42,
connector_name="Notion - My Workspace",
connector_type="NOTION_CONNECTOR",
search_space_id=db_search_space.id,
)
assert notification.id is not None
assert notification.type == "connector_indexing"
assert notification.title == "Syncing: Notion - My Workspace"
assert notification.message == "Connecting to your account"
assert notification.read is False
metadata = notification.notification_metadata
assert metadata["connector_id"] == 42
assert metadata["connector_type"] == "NOTION_CONNECTOR"
assert metadata["indexed_count"] == 0
assert metadata["sync_stage"] == "connecting"
assert metadata["status"] == "in_progress"
assert "operation_id" in metadata
assert "started_at" in metadata
async def _started(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
*,
connector_name: str = "Notion - My Workspace",
):
"""Open a connector-indexing notification to update in the tests below."""
return await NotificationService.connector_indexing.notify_indexing_started(
session=db_session,
user_id=db_user.id,
connector_id=42,
connector_name=connector_name,
connector_type="NOTION_CONNECTOR",
search_space_id=db_search_space.id,
)
async def test_indexing_progress_reports_stage_and_percent(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""Progress updates surface the stage message and compute a percent complete."""
notification = await _started(db_session, db_user, db_search_space)
updated = await NotificationService.connector_indexing.notify_indexing_progress(
session=db_session,
notification=notification,
indexed_count=5,
total_count=10,
stage="fetching",
)
assert updated.message == "Fetching your content"
metadata = updated.notification_metadata
assert metadata["indexed_count"] == 5
assert metadata["total_count"] == 10
assert metadata["progress_percent"] == 50
assert metadata["sync_stage"] == "fetching"
assert metadata["status"] == "in_progress"
async def test_indexing_completed_clean_success(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""A clean multi-file sync reports ready/completed with plural wording."""
notification = await _started(db_session, db_user, db_search_space)
done = await NotificationService.connector_indexing.notify_indexing_completed(
session=db_session,
notification=notification,
indexed_count=3,
)
assert done.title == "Ready: Notion - My Workspace"
assert done.message == "Now searchable! 3 files synced."
assert done.notification_metadata["status"] == "completed"
assert done.notification_metadata["sync_stage"] == "completed"
async def test_indexing_completed_singular_file(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""A single synced file uses singular 'file' wording."""
notification = await _started(db_session, db_user, db_search_space)
done = await NotificationService.connector_indexing.notify_indexing_completed(
session=db_session,
notification=notification,
indexed_count=1,
)
assert done.message == "Now searchable! 1 file synced."
async def test_indexing_completed_nothing_to_sync(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""Completing with nothing new reports 'Already up to date!'."""
notification = await _started(db_session, db_user, db_search_space)
done = await NotificationService.connector_indexing.notify_indexing_completed(
session=db_session,
notification=notification,
indexed_count=0,
)
assert done.title == "Ready: Notion - My Workspace"
assert done.message == "Already up to date!"
assert done.notification_metadata["status"] == "completed"
async def test_indexing_completed_hard_failure(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""An error with nothing synced reports a hard failure."""
notification = await _started(db_session, db_user, db_search_space)
done = await NotificationService.connector_indexing.notify_indexing_completed(
session=db_session,
notification=notification,
indexed_count=0,
error_message="boom",
)
assert done.title == "Failed: Notion - My Workspace"
assert done.message == "Sync failed: boom"
assert done.notification_metadata["status"] == "failed"
assert done.notification_metadata["sync_stage"] == "failed"
async def test_indexing_completed_partial_with_error_note(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""An error after partial progress still completes, with an appended note."""
notification = await _started(db_session, db_user, db_search_space)
done = await NotificationService.connector_indexing.notify_indexing_completed(
session=db_session,
notification=notification,
indexed_count=2,
error_message="partial outage",
)
assert done.title == "Ready: Notion - My Workspace"
assert done.message == "Now searchable! 2 files synced. Note: partial outage"
assert done.notification_metadata["status"] == "completed"
async def test_retry_progress_frames_delay_as_providers(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""A retry message frames the delay as the provider's, using its short name."""
notification = await _started(db_session, db_user, db_search_space)
retry = await NotificationService.connector_indexing.notify_retry_progress(
session=db_session,
notification=notification,
indexed_count=0,
retry_reason="rate_limit",
attempt=1,
max_attempts=3,
)
# service_name is derived from the connector name, stripping the workspace suffix.
assert retry.message == "Notion rate limit reached. Retrying..."
assert retry.notification_metadata["sync_stage"] == "waiting_retry"
assert retry.notification_metadata["retry_attempt"] == 1
assert retry.notification_metadata["retry_reason"] == "rate_limit"
async def test_retry_progress_shows_wait_and_synced_count(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
"""A retry surfaces the wait time and how many items synced so far."""
notification = await _started(db_session, db_user, db_search_space)
retry = await NotificationService.connector_indexing.notify_retry_progress(
session=db_session,
notification=notification,
indexed_count=2,
retry_reason="rate_limit",
attempt=2,
max_attempts=3,
wait_seconds=10,
)
assert (
retry.message
== "Notion rate limit reached. Retrying in 10s... (2 items synced so far)"
)

View file

@ -0,0 +1,80 @@
"""Behavior guard for the document-processing notification handler."""
from __future__ import annotations
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSpace, User
from app.notifications.service import NotificationService
pytestmark = pytest.mark.integration
handler = NotificationService.document_processing
async def _started(db_session, db_user, db_search_space, *, name="report.pdf"):
"""Open a document-processing notification to update in the tests below."""
return await handler.notify_processing_started(
session=db_session,
user_id=db_user.id,
document_type="FILE",
document_name=name,
search_space_id=db_search_space.id,
)
async def test_processing_started_queues(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""Starting processing queues a notification in the 'queued' stage."""
notification = await _started(db_session, db_user, db_search_space)
assert notification.type == "document_processing"
assert notification.title == "Processing: report.pdf"
assert notification.message == "Waiting in queue"
assert notification.notification_metadata["processing_stage"] == "queued"
async def test_processing_progress_maps_stage(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""A progress update maps the stage to its user-facing message."""
notification = await _started(db_session, db_user, db_search_space)
updated = await handler.notify_processing_progress(
session=db_session, notification=notification, stage="parsing"
)
assert updated.message == "Reading your file"
assert updated.notification_metadata["processing_stage"] == "parsing"
async def test_processing_completed_success(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""Successful processing reports ready/searchable and a completed status."""
notification = await _started(db_session, db_user, db_search_space)
done = await handler.notify_processing_completed(
session=db_session, notification=notification, document_id=99
)
assert done.title == "Ready: report.pdf"
assert done.message == "Now searchable!"
assert done.notification_metadata["status"] == "completed"
async def test_processing_completed_failure(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""Failed processing reports a failed status with the error in the message."""
notification = await _started(db_session, db_user, db_search_space)
done = await handler.notify_processing_completed(
session=db_session, notification=notification, error_message="bad file"
)
assert done.title == "Failed: report.pdf"
assert done.message == "Processing failed: bad file"
assert done.notification_metadata["status"] == "failed"

View file

@ -0,0 +1,221 @@
"""Behavior guard for the notifications inbox HTTP API.
Rows are seeded through the transactional db_session and read back through the
real endpoints (auth + DB bound to the same transaction), pinning list filters,
counts, mark-read semantics, and response mapping.
"""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSpace, User
from app.notifications.persistence import Notification
pytestmark = pytest.mark.integration
BASE = "/api/v1/notifications"
async def _seed(
db_session: AsyncSession,
user: User,
*,
type: str = "document_processing",
title: str = "Title",
message: str = "Message",
read: bool = False,
search_space_id: int | None = None,
metadata: dict | None = None,
created_at: datetime | None = None,
) -> Notification:
"""Insert a notification row directly for the API tests to read back."""
notification = Notification(
user_id=user.id,
search_space_id=search_space_id,
type=type,
title=title,
message=message,
read=read,
notification_metadata=metadata or {},
)
if created_at is not None:
notification.created_at = created_at
db_session.add(notification)
await db_session.flush()
return notification
async def test_list_returns_user_notifications_mapped(client, db_session, db_user):
"""GET / returns the caller's notifications mapped to the response shape."""
seeded = await _seed(
db_session, db_user, type="document_processing", title="Doc done"
)
resp = await client.get(BASE)
assert resp.status_code == 200
body = resp.json()
assert body["total"] == 1
item = body["items"][0]
assert item["id"] == seeded.id
assert item["user_id"] == str(db_user.id)
assert item["type"] == "document_processing"
assert item["title"] == "Doc done"
assert item["read"] is False
assert item["created_at"] # ISO string present
async def test_list_orders_newest_first(client, db_session, db_user):
"""The list is ordered by creation time, newest first."""
now = datetime.now(UTC)
await _seed(db_session, db_user, title="older", created_at=now - timedelta(hours=2))
await _seed(db_session, db_user, title="newer", created_at=now)
resp = await client.get(BASE)
titles = [item["title"] for item in resp.json()["items"]]
assert titles == ["newer", "older"]
async def test_list_filters_by_category(client, db_session, db_user):
"""The category filter narrows results to that category's notification types."""
await _seed(db_session, db_user, type="connector_indexing", title="status item")
await _seed(db_session, db_user, type="comment_reply", title="comment item")
resp = await client.get(BASE, params={"category": "comments"})
titles = [item["title"] for item in resp.json()["items"]]
assert titles == ["comment item"]
async def test_list_filters_unread_only(client, db_session, db_user):
"""The unread filter returns only notifications that haven't been read."""
await _seed(db_session, db_user, title="unread one", read=False)
await _seed(db_session, db_user, title="read one", read=True)
resp = await client.get(BASE, params={"filter": "unread"})
titles = [item["title"] for item in resp.json()["items"]]
assert titles == ["unread one"]
async def test_list_filters_by_connector_source_type(client, db_session, db_user):
"""A 'connector:<type>' source filter selects only that connector's notifications."""
await _seed(
db_session,
db_user,
type="connector_indexing",
title="github",
metadata={"connector_type": "GITHUB_CONNECTOR"},
)
await _seed(
db_session,
db_user,
type="connector_indexing",
title="notion",
metadata={"connector_type": "NOTION_CONNECTOR"},
)
resp = await client.get(BASE, params={"source_type": "connector:GITHUB_CONNECTOR"})
titles = [item["title"] for item in resp.json()["items"]]
assert titles == ["github"]
async def test_list_rejects_invalid_before_date(client, db_session, db_user):
"""A malformed before_date is rejected with a 400."""
await _seed(db_session, db_user)
resp = await client.get(BASE, params={"before_date": "not-a-date"})
assert resp.status_code == 400
async def test_list_paginates_with_has_more(client, db_session, db_user):
"""Pagination caps the page and reports has_more plus the next offset."""
now = datetime.now(UTC)
for i in range(3):
await _seed(
db_session, db_user, title=f"n{i}", created_at=now - timedelta(minutes=i)
)
resp = await client.get(BASE, params={"limit": 2, "offset": 0})
body = resp.json()
assert len(body["items"]) == 2
assert body["has_more"] is True
assert body["next_offset"] == 2
async def test_unread_count_splits_total_and_recent(client, db_session, db_user):
"""The unread count reports total unread and a recent-window subset."""
now = datetime.now(UTC)
await _seed(db_session, db_user, read=False, created_at=now)
await _seed(db_session, db_user, read=False, created_at=now - timedelta(days=30))
await _seed(db_session, db_user, read=True, created_at=now)
resp = await client.get(f"{BASE}/unread-count")
body = resp.json()
assert body["total_unread"] == 2
assert body["recent_unread"] == 1
async def test_unread_counts_batch_by_category(client, db_session, db_user):
"""The batch endpoint breaks unread counts down per category."""
await _seed(db_session, db_user, type="comment_reply", read=False)
await _seed(db_session, db_user, type="connector_indexing", read=False)
resp = await client.get(f"{BASE}/unread-counts-batch")
body = resp.json()
assert body["comments"]["total_unread"] == 1
assert body["status"]["total_unread"] == 1
async def test_mark_read_then_idempotent(client, db_session, db_user):
"""Marking read succeeds, and a repeat call is a no-op reporting already-read."""
notification = await _seed(db_session, db_user, read=False)
first = await client.patch(f"{BASE}/{notification.id}/read")
assert first.status_code == 200
assert first.json()["success"] is True
second = await client.patch(f"{BASE}/{notification.id}/read")
assert second.status_code == 200
assert second.json()["message"] == "Notification already marked as read"
async def test_mark_read_foreign_notification_404(client, db_session, db_user):
"""Marking another user's notification read returns 404, not a cross-user write."""
other = User(
email="other@surfsense.net",
hashed_password="hashed",
is_active=True,
is_superuser=False,
is_verified=True,
)
db_session.add(other)
await db_session.flush()
foreign = await _seed(db_session, other, read=False)
resp = await client.patch(f"{BASE}/{foreign.id}/read")
assert resp.status_code == 404
async def test_mark_all_read_returns_count(client, db_session, db_user):
"""Mark-all-read flips only the unread rows and returns how many changed."""
await _seed(db_session, db_user, read=False)
await _seed(db_session, db_user, read=False)
await _seed(db_session, db_user, read=True)
resp = await client.patch(f"{BASE}/read-all")
body = resp.json()
assert body["success"] is True
assert body["updated_count"] == 2

View file

@ -0,0 +1,62 @@
"""Behavior guard for the @mention notification handler."""
from __future__ import annotations
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSpace, User
from app.notifications.service import NotificationService
pytestmark = pytest.mark.integration
handler = NotificationService.mention
async def _notify(db_session, db_user, db_search_space, *, mention_id=1, preview="hi"):
"""Raise an @mention notification for the assertions in the tests below."""
return await handler.notify_new_mention(
session=db_session,
mentioned_user_id=db_user.id,
mention_id=mention_id,
comment_id=10,
message_id=20,
thread_id=30,
thread_title="Thread",
author_id="author-1",
author_name="Alice",
author_avatar_url=None,
author_email="alice@surfsense.net",
content_preview=preview,
search_space_id=db_search_space.id,
)
async def test_new_mention_title_and_message(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""A mention notification names the author and carries the comment preview."""
notification = await _notify(db_session, db_user, db_search_space, preview="hello")
assert notification.type == "new_mention"
assert notification.title == "Alice mentioned you"
assert notification.message == "hello"
async def test_new_mention_truncates_long_preview(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""A long comment preview is truncated in the mention message."""
notification = await _notify(db_session, db_user, db_search_space, preview="x" * 150)
assert notification.message == "x" * 100 + "..."
async def test_new_mention_is_idempotent(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""Re-notifying the same mention id reuses the existing notification row."""
first = await _notify(db_session, db_user, db_search_space, mention_id=7)
second = await _notify(db_session, db_user, db_search_space, mention_id=7)
assert second.id == first.id

View file

@ -0,0 +1,61 @@
"""Behavior guard for the page-limit notification handler."""
from __future__ import annotations
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSpace, User
from app.notifications.service import NotificationService
pytestmark = pytest.mark.integration
handler = NotificationService.page_limit
async def test_page_limit_message_and_action(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""A page-limit notification states usage and carries an upgrade action link."""
notification = await handler.notify_page_limit_exceeded(
session=db_session,
user_id=db_user.id,
document_name="short.pdf",
document_type="FILE",
search_space_id=db_search_space.id,
pages_used=95,
pages_limit=100,
pages_to_add=10,
)
assert notification.type == "page_limit_exceeded"
assert notification.title == "Page limit exceeded: short.pdf"
assert notification.message == (
"This document has ~10 page(s) but you've used 95/100 pages. "
"Upgrade to process more documents."
)
assert notification.notification_metadata["status"] == "failed"
assert notification.notification_metadata["action_label"] == "Upgrade Plan"
assert notification.notification_metadata["action_url"] == (
f"/dashboard/{db_search_space.id}/more-pages"
)
async def test_page_limit_truncates_long_name(
db_session: AsyncSession, db_user: User, db_search_space: SearchSpace
):
"""A long document name is truncated in the notification title."""
long_name = "a" * 50
notification = await handler.notify_page_limit_exceeded(
session=db_session,
user_id=db_user.id,
document_name=long_name,
document_type="FILE",
search_space_id=db_search_space.id,
pages_used=95,
pages_limit=100,
pages_to_add=10,
)
assert notification.title == f"Page limit exceeded: {'a' * 40}..."