feat(event_bus): add in-process domain event bus

A standalone, domain-agnostic pub/sub seam: an EventBus that owns its
subscriber registry and streams Event values from producers to listeners
in process. Boundary-crossing (Celery/DB/workers) is left to subscribers,
keeping the bus single-responsibility. Includes the immutable Event value
object and full unit coverage.
This commit is contained in:
CREDO23 2026-05-29 15:26:12 +02:00
parent 5d90fbe99f
commit d6dfe53d62
6 changed files with 369 additions and 0 deletions

View file

@ -0,0 +1,20 @@
"""In-process domain event bus.
Domain-agnostic pub/sub. Producers ``await bus.publish(...)``; subscribers
``bus.subscribe(...)``. Domain modules depend on it, never the reverse.
from app.event_bus import bus
await bus.publish("document.indexed", {"document_id": 42}, search_space_id=7)
"""
from __future__ import annotations
from .bus import EventBus, Subscriber, bus
from .event import Event
__all__ = [
"Event",
"EventBus",
"Subscriber",
"bus",
]

View file

@ -0,0 +1,77 @@
"""In-process pub/sub. Streams :class:`Event` values from producers to listeners.
Boundary-crossing (Celery, DB, workers) is a subscriber's job — e.g. the
``event`` trigger enqueues its own task.
"""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable, Callable
from typing import Any
from .event import Event
logger = logging.getLogger(__name__)
Subscriber = Callable[[Event], Awaitable[None]]
class EventBus:
"""An in-process pub/sub bus with a per-instance subscriber registry."""
def __init__(self) -> None:
self._subscribers: list[Subscriber] = []
def subscribe(self, handler: Subscriber) -> Subscriber:
"""Register ``handler`` for every event. Idempotent; returns the handler
so it works as a decorator."""
if handler not in self._subscribers:
self._subscribers.append(handler)
return handler
def subscribers(self) -> list[Subscriber]:
"""Defensive snapshot of the registered subscribers."""
return list(self._subscribers)
async def publish(
self,
event_type: str,
payload: dict[str, Any] | None = None,
*,
search_space_id: int,
) -> None:
"""Stamp an :class:`Event` and fan it out. Call after your commit."""
event = Event(
event_type=event_type,
payload=payload or {},
search_space_id=search_space_id,
)
await self.dispatch(event)
async def dispatch(self, event: Event) -> None:
"""Fan ``event`` out concurrently. Subscriber failures are logged and
isolated; never propagate."""
subscribers = self.subscribers()
if not subscribers:
return
results = await asyncio.gather(
*(handler(event) for handler in subscribers),
return_exceptions=True,
)
for handler, result in zip(subscribers, results, strict=True):
if isinstance(result, Exception):
logger.error(
"event subscriber %r failed for event %s (%s)",
getattr(handler, "__qualname__", handler),
event.event_id,
event.event_type,
exc_info=result,
)
# Process-wide bus. Producers publish to it; subscribers register on it.
bus = EventBus()

View file

@ -0,0 +1,38 @@
"""The ``Event`` value object — the only shape that crosses the bus.
An immutable fact: something named happened, with this payload, in this space,
at this time. JSON round-trippable so a subscriber can queue it to a worker.
"""
from __future__ import annotations
import uuid
from datetime import UTC, datetime
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
def _new_event_id() -> str:
return uuid.uuid4().hex
def _now() -> datetime:
return datetime.now(UTC)
class Event(BaseModel):
"""A published domain fact.
``event_type`` is a dotted namespace (``document.indexed``, etc). ``payload`` is
JSON-serializable. ``search_space_id`` scopes delivery. ``event_id`` and
``occurred_at`` are engine-stamped.
"""
model_config = ConfigDict(frozen=True)
event_type: str
payload: dict[str, Any] = Field(default_factory=dict)
search_space_id: int
event_id: str = Field(default_factory=_new_event_id)
occurred_at: datetime = Field(default_factory=_now)