Source code for simvx.core.event_bus

"""Typed event bus with weak handler references.

Handlers subscribe against a *dataclass* event type. Calling
:meth:`EventBus.publish` with an instance of that dataclass synchronously
fans the event out to every live handler subscribed for the *exact* class.
There is no MRO walk: a handler for ``Parent`` does NOT receive publishes
of ``Child(Parent)`` -- subclasses must subscribe themselves. This keeps
dispatch predictable and O(1) per event class.

Vocabulary (subscribe/publish/unsubscribe) is intentionally distinct from
:class:`Signal`'s connect/emit/disconnect: an ``EventBus`` is a typed pub/sub
hub, while a Signal is a per-instance callback list. Different shapes, so
different verbs.

Handlers are stored as weak references:

* Bound methods are wrapped in :class:`weakref.WeakMethod` so the
  subscription is automatically dropped when the owning instance is garbage
  collected.
* Free functions and other callables are stored via :class:`weakref.ref`.
  CPython 3.13+ supports weakrefs to module-level functions, but local
  closures and ``functools.partial`` objects do not. Those raise
  :class:`TypeError` at subscribe time -- callers must use a method on a
  long-lived object, or hold their own strong reference and pass a
  module-level function.

Two dispatch modes:

* :meth:`publish` -- synchronous. Handlers run in registration order on
  the calling thread before ``publish`` returns.
* :meth:`publish_deferred` -- queues the event. The queue is drained when
  :meth:`flush_deferred` is called (typically once per frame).

Threading: the bus assumes a single-threaded engine. There are no locks.
Calling ``publish`` from a non-main thread is undefined behaviour.
"""

from __future__ import annotations

import dataclasses
import weakref
from collections.abc import Callable
from typing import Any

EventCls = type
Handler = Callable[[Any], None]


[docs] class EventBus: """Typed publish/subscribe bus keyed on dataclass event types.""" __slots__ = ("_subs", "_deferred") def __init__(self) -> None: self._subs: dict[EventCls, set[weakref.ReferenceType[Any]]] = {} self._deferred: list[Any] = [] # ------------------------------------------------------------------ # Subscription # ------------------------------------------------------------------
[docs] def subscribe(self, event_cls: EventCls, handler: Handler) -> None: """Register ``handler`` for events of exactly ``event_cls``. Bound methods are stored via :class:`weakref.WeakMethod`. Other callables use :class:`weakref.ref`; if the callable does not support weak references (e.g. a local closure or :class:`functools.partial`), a :class:`TypeError` is raised. """ ref = self._make_ref(handler) self._subs.setdefault(event_cls, set()).add(ref)
[docs] def unsubscribe(self, event_cls: EventCls, handler: Handler) -> None: """Remove ``handler`` from ``event_cls``. Idempotent.""" bucket = self._subs.get(event_cls) if not bucket: return target = self._make_ref(handler) # WeakMethod and weakref.ref both implement __eq__ by comparing # the underlying referent, so set.discard works on identity of the # referenced callable. bucket.discard(target) if not bucket: del self._subs[event_cls]
# ------------------------------------------------------------------ # Dispatch # ------------------------------------------------------------------
[docs] def publish(self, event: Any) -> None: """Synchronously fan ``event`` out to handlers of its exact type. Raises :class:`TypeError` if ``event`` is not a dataclass instance. Dead weak references are pruned lazily during iteration. """ if not dataclasses.is_dataclass(event) or isinstance(event, type): raise TypeError( f"EventBus.publish requires a dataclass instance, got {type(event).__name__!r}" ) bucket = self._subs.get(type(event)) if not bucket: return dead: list[weakref.ReferenceType[Any]] = [] for ref in list(bucket): handler = ref() if handler is None: dead.append(ref) continue handler(event) if dead: for ref in dead: bucket.discard(ref) if not bucket: self._subs.pop(type(event), None)
[docs] def publish_deferred(self, event: Any) -> None: """Queue ``event`` for the next :meth:`flush_deferred` call.""" if not dataclasses.is_dataclass(event) or isinstance(event, type): raise TypeError( f"EventBus.publish_deferred requires a dataclass instance, got {type(event).__name__!r}" ) self._deferred.append(event)
[docs] def flush_deferred(self) -> None: """Drain the deferred queue, dispatching each event via :meth:`publish`.""" if not self._deferred: return # Swap-and-clear so handlers that publish_deferred during flush land # on the next flush, not the current one. pending = self._deferred self._deferred = [] for event in pending: self.publish(event)
# ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ @staticmethod def _make_ref(handler: Handler) -> weakref.ReferenceType[Any]: """Build the appropriate weak reference for ``handler``.""" if hasattr(handler, "__self__") and hasattr(handler, "__func__"): return weakref.WeakMethod(handler) try: return weakref.ref(handler) except TypeError as exc: raise TypeError( f"EventBus handler {handler!r} does not support weak references; " "use a bound method on a long-lived object or a module-level function." ) from exc