diff --git a/surfsense_backend/app/event_bus/__init__.py b/surfsense_backend/app/event_bus/__init__.py new file mode 100644 index 000000000..b71cc1b64 --- /dev/null +++ b/surfsense_backend/app/event_bus/__init__.py @@ -0,0 +1,20 @@ +"""In-process domain event bus. + +Domain-agnostic pub/sub. Producers ``await bus.publish(...)``; subscribers +``bus.subscribe(...)``. Domain modules depend on it, never the reverse. + + from app.event_bus import bus + await bus.publish("document.indexed", {"document_id": 42}, search_space_id=7) +""" + +from __future__ import annotations + +from .bus import EventBus, Subscriber, bus +from .event import Event + +__all__ = [ + "Event", + "EventBus", + "Subscriber", + "bus", +] diff --git a/surfsense_backend/app/event_bus/bus.py b/surfsense_backend/app/event_bus/bus.py new file mode 100644 index 000000000..38c93ba7c --- /dev/null +++ b/surfsense_backend/app/event_bus/bus.py @@ -0,0 +1,77 @@ +"""In-process pub/sub. Streams :class:`Event` values from producers to listeners. + +Boundary-crossing (Celery, DB, workers) is a subscriber's job — e.g. the +``event`` trigger enqueues its own task. +""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Awaitable, Callable +from typing import Any + +from .event import Event + +logger = logging.getLogger(__name__) + +Subscriber = Callable[[Event], Awaitable[None]] + + +class EventBus: + """An in-process pub/sub bus with a per-instance subscriber registry.""" + + def __init__(self) -> None: + self._subscribers: list[Subscriber] = [] + + def subscribe(self, handler: Subscriber) -> Subscriber: + """Register ``handler`` for every event. Idempotent; returns the handler + so it works as a decorator.""" + if handler not in self._subscribers: + self._subscribers.append(handler) + return handler + + def subscribers(self) -> list[Subscriber]: + """Defensive snapshot of the registered subscribers.""" + return list(self._subscribers) + + async def publish( + self, + event_type: str, + payload: dict[str, Any] | None = None, + *, + search_space_id: int, + ) -> None: + """Stamp an :class:`Event` and fan it out. Call after your commit.""" + event = Event( + event_type=event_type, + payload=payload or {}, + search_space_id=search_space_id, + ) + await self.dispatch(event) + + async def dispatch(self, event: Event) -> None: + """Fan ``event`` out concurrently. Subscriber failures are logged and + isolated; never propagate.""" + subscribers = self.subscribers() + if not subscribers: + return + + results = await asyncio.gather( + *(handler(event) for handler in subscribers), + return_exceptions=True, + ) + + for handler, result in zip(subscribers, results, strict=True): + if isinstance(result, Exception): + logger.error( + "event subscriber %r failed for event %s (%s)", + getattr(handler, "__qualname__", handler), + event.event_id, + event.event_type, + exc_info=result, + ) + + +# Process-wide bus. Producers publish to it; subscribers register on it. +bus = EventBus() diff --git a/surfsense_backend/app/event_bus/event.py b/surfsense_backend/app/event_bus/event.py new file mode 100644 index 000000000..5dc3f7081 --- /dev/null +++ b/surfsense_backend/app/event_bus/event.py @@ -0,0 +1,38 @@ +"""The ``Event`` value object — the only shape that crosses the bus. + +An immutable fact: something named happened, with this payload, in this space, +at this time. JSON round-trippable so a subscriber can queue it to a worker. +""" + +from __future__ import annotations + +import uuid +from datetime import UTC, datetime +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + + +def _new_event_id() -> str: + return uuid.uuid4().hex + + +def _now() -> datetime: + return datetime.now(UTC) + + +class Event(BaseModel): + """A published domain fact. + + ``event_type`` is a dotted namespace (``document.indexed``, etc). ``payload`` is + JSON-serializable. ``search_space_id`` scopes delivery. ``event_id`` and + ``occurred_at`` are engine-stamped. + """ + + model_config = ConfigDict(frozen=True) + + event_type: str + payload: dict[str, Any] = Field(default_factory=dict) + search_space_id: int + event_id: str = Field(default_factory=_new_event_id) + occurred_at: datetime = Field(default_factory=_now) diff --git a/surfsense_backend/tests/unit/event_bus/__init__.py b/surfsense_backend/tests/unit/event_bus/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/tests/unit/event_bus/test_bus.py b/surfsense_backend/tests/unit/event_bus/test_bus.py new file mode 100644 index 000000000..6c970760f --- /dev/null +++ b/surfsense_backend/tests/unit/event_bus/test_bus.py @@ -0,0 +1,181 @@ +"""``EventBus`` contract: subscribe, publish (stamp + fan out), dispatch. + +Each test uses a fresh ``EventBus`` — no shared global state. +""" + +from __future__ import annotations + +import pytest + +from app.event_bus import Event, EventBus + +pytestmark = pytest.mark.unit + + +def _event() -> Event: + return Event(event_type="x.happened", payload={"k": "v"}, search_space_id=1) + + +async def _noop(_event: Event) -> None: + return None + + +async def _other(_event: Event) -> None: + return None + + +# --- registry ------------------------------------------------------------- + + +def test_subscribe_then_subscribers_returns_the_handler() -> None: + bus = EventBus() + bus.subscribe(_noop) + + assert _noop in bus.subscribers() + + +def test_subscribe_is_idempotent_for_the_same_handler() -> None: + """Registering the same handler twice must not make it fire twice.""" + bus = EventBus() + bus.subscribe(_noop) + bus.subscribe(_noop) + + assert bus.subscribers().count(_noop) == 1 + + +def test_distinct_handlers_both_register() -> None: + bus = EventBus() + bus.subscribe(_noop) + bus.subscribe(_other) + + registered = bus.subscribers() + assert _noop in registered + assert _other in registered + + +def test_subscribers_returns_a_defensive_snapshot() -> None: + """Mutating the returned list must not corrupt the registry.""" + bus = EventBus() + bus.subscribe(_noop) + + snapshot = bus.subscribers() + snapshot.clear() + + assert _noop in bus.subscribers() + + +def test_subscribe_returns_handler_so_it_can_be_used_as_a_decorator() -> None: + bus = EventBus() + returned = bus.subscribe(_other) + + assert returned is _other + + +def test_two_buses_do_not_share_subscribers() -> None: + """The registry is per-instance, not global.""" + a = EventBus() + b = EventBus() + a.subscribe(_noop) + + assert _noop in a.subscribers() + assert _noop not in b.subscribers() + + +# --- dispatch ------------------------------------------------------------- + + +async def test_dispatch_delivers_event_to_every_subscriber() -> None: + bus = EventBus() + seen: list[tuple[str, Event]] = [] + + async def first(event: Event) -> None: + seen.append(("first", event)) + + async def second(event: Event) -> None: + seen.append(("second", event)) + + bus.subscribe(first) + bus.subscribe(second) + + event = _event() + await bus.dispatch(event) + + assert ("first", event) in seen + assert ("second", event) in seen + + +async def test_dispatch_isolates_a_failing_subscriber() -> None: + """A subscriber that raises must not stop a healthy one from running.""" + bus = EventBus() + healthy_ran = False + + async def boom(_event: Event) -> None: + raise RuntimeError("subscriber blew up") + + async def healthy(_event: Event) -> None: + nonlocal healthy_ran + healthy_ran = True + + bus.subscribe(boom) + bus.subscribe(healthy) + + await bus.dispatch(_event()) + + assert healthy_ran is True + + +async def test_dispatch_never_propagates_subscriber_errors() -> None: + """``dispatch`` itself must not raise even if every subscriber fails.""" + bus = EventBus() + + async def boom(_event: Event) -> None: + raise ValueError("nope") + + bus.subscribe(boom) + + await bus.dispatch(_event()) # must not raise + + +async def test_dispatch_with_no_subscribers_is_a_noop() -> None: + bus = EventBus() + await bus.dispatch(_event()) # must not raise + + +# --- publish -------------------------------------------------------------- + + +async def test_publish_builds_a_stamped_event_and_fans_it_out() -> None: + bus = EventBus() + received: list[Event] = [] + + async def handler(event: Event) -> None: + received.append(event) + + bus.subscribe(handler) + await bus.publish("document.indexed", {"document_id": 42}, search_space_id=7) + + assert len(received) == 1 + event = received[0] + assert event.event_type == "document.indexed" + assert event.payload == {"document_id": 42} + assert event.search_space_id == 7 + # Engine-stamped identity/time on the way through. + assert event.event_id + assert event.occurred_at + + +async def test_publish_defaults_payload_to_empty_dict() -> None: + bus = EventBus() + received: list[Event] = [] + + async def handler(event: Event) -> None: + received.append(event) + + bus.subscribe(handler) + await bus.publish("x.happened", search_space_id=1) + + assert received[0].payload == {} + + +async def test_publish_with_no_subscribers_is_a_noop() -> None: + await EventBus().publish("x.happened", search_space_id=1) # must not raise diff --git a/surfsense_backend/tests/unit/event_bus/test_event.py b/surfsense_backend/tests/unit/event_bus/test_event.py new file mode 100644 index 000000000..d09cb4364 --- /dev/null +++ b/surfsense_backend/tests/unit/event_bus/test_event.py @@ -0,0 +1,53 @@ +"""``Event`` contract: carry caller facts + engine-stamped id/time, round-trip JSON.""" + +from __future__ import annotations + +from datetime import datetime + +import pytest + +from app.event_bus.event import Event + +pytestmark = pytest.mark.unit + + +def test_event_carries_caller_supplied_facts() -> None: + """The three caller inputs are stored verbatim.""" + event = Event( + event_type="document.indexed", + payload={"document_id": 42, "content_type": "pdf"}, + search_space_id=7, + ) + + assert event.event_type == "document.indexed" + assert event.payload == {"document_id": 42, "content_type": "pdf"} + assert event.search_space_id == 7 + + +def test_event_stamps_identity_and_time_when_not_supplied() -> None: + """Engine stamps id + time so subscribers can dedup/order.""" + event = Event(event_type="x.happened", payload={}, search_space_id=1) + + assert event.event_id + assert isinstance(event.occurred_at, datetime) + + +def test_event_ids_are_unique_per_instance() -> None: + """Two events published with identical content are still distinct facts.""" + first = Event(event_type="x.happened", payload={}, search_space_id=1) + second = Event(event_type="x.happened", payload={}, search_space_id=1) + + assert first.event_id != second.event_id + + +def test_event_survives_json_round_trip() -> None: + """Serialize → deserialize reproduces the event (subscribers queue it as JSON).""" + original = Event( + event_type="podcast.generated", + payload={"podcast_id": 9, "duration_s": 123.5}, + search_space_id=3, + ) + + restored = Event.model_validate_json(original.model_dump_json()) + + assert restored == original