Event Bus

The :class:simvx.core.event_bus.EventBus is SimVX’s typed publish/subscribe channel for decoupled inter-node communication. Producers publish a dataclass instance describing what happened; handlers subscribe against the dataclass type and are called whenever an instance of that exact type is published.

The bus is created automatically per :class:SceneTree and exposed as tree.events. There is no manual setup, no need to instantiate or inject – engines, autoloads, and nodes all share the same bus. The vocabulary (subscribe / publish / unsubscribe) is intentionally distinct from :class:Signal’s connect / emit / disconnect: EventBus is a typed pub/sub hub, Signal is a per-instance callback list.

self.tree.events.subscribe(PlayerDied, self._on_player_died)
self.tree.events.publish(PlayerDied(player=self))

This is the one canonical way to broadcast game-wide events. Use

class:

Signal for parent-child notifications scoped to a single node; use tree.events for “anyone in the scene tree might care” events.

Signals vs EventBus – when to use which

Use a :class:Signal when…

Use tree.events when…

The receiver already has a reference to the emitter (e.g. a parent watching a child).

The producer and consumer are decoupled (different parts of the tree, autoloads, plugins).

The connection lives and dies with one specific instance.

Subscribers come and go independently of any specific publisher.

You want low-overhead per-instance fanout, no type registration.

You want a type-checked payload – a dataclass the inspector and IDE can introspect.

There is no fall-back path that does both. Pick one per event.

Defining events

Events are :func:dataclasses.dataclass instances. Frozen is recommended because handlers should not mutate the payload after dispatch:

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .nodes.player import Player


@dataclass(frozen=True)
class PlayerDied:
    """Published by Player when hp first reaches zero."""

    player: Player


@dataclass(frozen=True)
class PlayerLevelledUp:
    player: Player
    new_level: int

Per the one obvious way rule: each game situation maps to one event class. Don’t fan out parallel events for the same situation – handlers infer related state (XP, gold, kill count) from the payload or from the referenced node.

Important

EventBus.publish raises :class:TypeError if the value is not a dataclass instance. Tuples, dicts, and ad-hoc namespaces are rejected. This keeps the bus self-documenting: every event class is a discoverable Python type that the editor’s “Find Usages” can follow.

Subscribing

class HUD(Node):
    def on_ready(self) -> None:
        bus = self.tree.events
        bus.subscribe(PlayerDied, self._on_player_died)
        bus.subscribe(PlayerLevelledUp, self._on_level_up)

    def _on_player_died(self, evt: PlayerDied) -> None:
        self.show_game_over_for(evt.player)

    def _on_level_up(self, evt: PlayerLevelledUp) -> None:
        self.flash_level_banner(evt.new_level)

Always subscribe in on_ready(), not __init__. on_ready() runs after self.tree is bound; __init__ may run before the node is in the tree.

Weak-reference semantics

Handlers are stored as weak references. Two cases:

  • Bound methods (self._on_player_died) are wrapped in

    class:

    weakref.WeakMethod. The subscription is dropped automatically when the owning node is garbage-collected. You do not need to call unsubscribe when a node leaves the tree.

  • Free functions (module-level def) are stored via

    class:

    weakref.ref. CPython 3.13+ supports weakrefs to module-level functions.

What does not work:

  • Local closures – the bus raises :class:TypeError at subscribe time. Use a method on a long-lived object instead.

  • functools.partial – same: not weakref-able. Define a small wrapper method on the receiver.

# WRONG -- closure cannot be weakref'd, raises TypeError
def on_ready(self):
    bus = self.tree.events
    bus.subscribe(PlayerDied, lambda evt: self._show_overlay(evt.player))

# RIGHT -- method on self
def on_ready(self):
    self.tree.events.subscribe(PlayerDied, self._on_player_died)

def _on_player_died(self, evt: PlayerDied) -> None:
    self._show_overlay(evt.player)

If the receiver genuinely needs partial application, hold the partial as an instance attribute (self._handler = partial(...)), pass self._handler, and rely on the receiver’s lifetime to keep the partial alive. The bus will fail at subscribe time anyway, surfacing the bug immediately.

Exact-class dispatch – no MRO walk

Events fan out to handlers registered for the exact dataclass type. There is no MRO walk:

@dataclass(frozen=True)
class EnemyDied:
    enemy: Enemy

@dataclass(frozen=True)
class BossDefeated(EnemyDied):
    """Boss is also an enemy, but is its own event."""
    boss: Boss

# Handlers ONLY hear their exact type:
bus.subscribe(EnemyDied, on_any_enemy_died)        # NOT called for BossDefeated
bus.subscribe(BossDefeated, on_boss_defeated)      # called for BossDefeated only
bus.publish(BossDefeated(enemy=b, boss=b))         # only on_boss_defeated runs

This rule is deliberate. Two reasons:

  1. Predictable dispatch. No surprise calls when refactoring class hierarchies. Adding a new event subclass cannot retroactively change what existing handlers see.

  2. O(1) per publish. No type-graph walk; one dict lookup per event.

If you genuinely need a “match any subclass” behaviour, publish both events explicitly from the producer:

def die(self) -> None:
    self.tree.events.publish(BossDefeated(enemy=self, boss=self))
    self.tree.events.publish(EnemyDied(enemy=self))

Synchronous vs deferred dispatch

bus.publish(event)            # synchronous: handlers run before publish returns
bus.publish_deferred(event)   # queued: handlers run on the next frame's flush

publish runs handlers in registration order on the calling thread, then returns. Use it for events that must be observed before the next line of the caller’s code.

publish_deferred queues the event. The :class:SceneTree calls self.events.flush_deferred() once per process tick, before any _process runs. Use it when:

  • You’re inside a _physics_process and want UI to react on the next visual frame, not mid-physics.

  • A handler might publish further events – publish_deferred keeps recursion shallow and avoids re-entering a handler before it has finished.

  • Many producers publish during one frame and you want one batch of UI updates, not N.

Events queued during a flush are dispatched on the next flush, not the current one. This bounds re-entrancy.

Threading

The bus assumes a single-threaded engine. There are no locks. Calling publish or subscribe from a worker thread is undefined behaviour – use publish_deferred if you must, but only from the main thread; cross the thread boundary with the engine’s own coroutine / task plumbing first.

The events autoload reservation

The name events on a :class:SceneTree is reserved for the EventBus. The autoload registry rejects any project that tries to register an autoload under that name:

# This raises in project load:
# ProjectSettings.from_toml(...) → ValueError
[autoload]
events = "my_game.scripts.MyEventManager"

Likewise, no node attribute named events should shadow tree.events when accessed via self.tree.events – because the bus is a property on the tree, the path is always unambiguous. (A node named events is unrelated and fine.)

The bus survives change_scene(): subscriptions held by autoloads or other long-lived objects keep firing across scene swaps. Subscriptions held by transient nodes drop automatically when those nodes are freed, thanks to the weakref storage.

Worked example: Dungeon Explorer combat

Dungeon Explorer’s combat refactor uses the bus for everything that crosses node boundaries.

Define the events (games/dungeon_explorer/events.py):

from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .nodes.boss_enemy import BossEnemy
    from .nodes.player import Player


@dataclass(frozen=True)
class PlayerDied:
    player: Player


@dataclass(frozen=True)
class PlayerLevelledUp:
    player: Player
    new_level: int


@dataclass(frozen=True)
class BossDefeated:
    boss: BossEnemy


@dataclass(frozen=True)
class BossPhaseChanged:
    boss: BossEnemy
    new_phase: int

Producer (nodes/player.py excerpt):

class Player(CharacterBody2D):
    def take_damage(self, amount: float) -> None:
        if self.hp <= 0:
            return
        self.hp = max(0, self.hp - amount)
        if self.hp == 0:
            self.tree.events.publish(PlayerDied(player=self))

    def gain_xp(self, amount: int) -> None:
        self.xp += amount
        while self.xp >= self._xp_for_next_level():
            self.xp -= self._xp_for_next_level()
            self.level += 1
            self.tree.events.publish(PlayerLevelledUp(player=self, new_level=self.level))

Handler (nodes/player_hud.py excerpt):

class PlayerHUD(Control):
    def on_ready(self) -> None:
        bus = self.tree.events
        bus.subscribe(PlayerDied, self._on_player_died)
        bus.subscribe(PlayerLevelledUp, self._on_levelled_up)
        bus.subscribe(BossDefeated, self._on_boss_defeated)

    def _on_player_died(self, evt: PlayerDied) -> None:
        self.show_death_overlay()

    def _on_levelled_up(self, evt: PlayerLevelledUp) -> None:
        self.flash_level_banner(evt.new_level)

    def _on_boss_defeated(self, evt: BossDefeated) -> None:
        self.show_victory_screen(evt.boss)

The HUD never imports Player or BossEnemy; it knows only the event dataclasses. When the player or boss is freed, the bound-method subscriptions drop without any explicit unsubscribe – the HUD’s own __del__ is unnecessary.

See :doc:save_system for the natural pairing: publish a SaveSucceeded or SaveFailed event from the save UI handler so unrelated subsystems (achievement tracker, telemetry) can react without coupling to the save button itself.