Event Bus¶
The :class:simvx.core.event_bus.EventBus is SimVX’s typed
publish/subscribe channel for decoupled inter-node communication. Producers
publish a dataclass instance describing what happened; handlers subscribe
against the dataclass type and are called whenever an instance of that
exact type is published.
The bus is created automatically per :class:SceneTree and exposed as
tree.events. There is no manual setup, no need to instantiate or
inject – engines, autoloads, and nodes all share the same bus. The
vocabulary (subscribe / publish / unsubscribe) is intentionally
distinct from :class:Signal’s connect / emit / disconnect:
EventBus is a typed pub/sub hub, Signal is a per-instance callback list.
self.tree.events.subscribe(PlayerDied, self._on_player_died)
self.tree.events.publish(PlayerDied(player=self))
This is the one canonical way to broadcast game-wide events. Use
- class:
Signalfor parent-child notifications scoped to a single node; usetree.eventsfor “anyone in the scene tree might care” events.
Signals vs EventBus – when to use which¶
Use a :class: |
Use |
|---|---|
The receiver already has a reference to the emitter (e.g. a parent watching a child). |
The producer and consumer are decoupled (different parts of the tree, autoloads, plugins). |
The connection lives and dies with one specific instance. |
Subscribers come and go independently of any specific publisher. |
You want low-overhead per-instance fanout, no type registration. |
You want a type-checked payload – a dataclass the inspector and IDE can introspect. |
There is no fall-back path that does both. Pick one per event.
Defining events¶
Events are :func:dataclasses.dataclass instances. Frozen is recommended
because handlers should not mutate the payload after dispatch:
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .nodes.player import Player
@dataclass(frozen=True)
class PlayerDied:
"""Published by Player when hp first reaches zero."""
player: Player
@dataclass(frozen=True)
class PlayerLevelledUp:
player: Player
new_level: int
Per the one obvious way rule: each game situation maps to one event class. Don’t fan out parallel events for the same situation – handlers infer related state (XP, gold, kill count) from the payload or from the referenced node.
Important
EventBus.publish raises :class:TypeError if the value is not a
dataclass instance. Tuples, dicts, and ad-hoc namespaces are rejected.
This keeps the bus self-documenting: every event class is a discoverable
Python type that the editor’s “Find Usages” can follow.
Subscribing¶
class HUD(Node):
def on_ready(self) -> None:
bus = self.tree.events
bus.subscribe(PlayerDied, self._on_player_died)
bus.subscribe(PlayerLevelledUp, self._on_level_up)
def _on_player_died(self, evt: PlayerDied) -> None:
self.show_game_over_for(evt.player)
def _on_level_up(self, evt: PlayerLevelledUp) -> None:
self.flash_level_banner(evt.new_level)
Always subscribe in on_ready(), not __init__. on_ready() runs
after self.tree is bound; __init__ may run before the node is in
the tree.
Weak-reference semantics¶
Handlers are stored as weak references. Two cases:
Bound methods (
self._on_player_died) are wrapped in- class:
weakref.WeakMethod. The subscription is dropped automatically when the owning node is garbage-collected. You do not need to callunsubscribewhen a node leaves the tree.
Free functions (module-level
def) are stored via- class:
weakref.ref. CPython 3.13+ supports weakrefs to module-level functions.
What does not work:
Local closures – the bus raises :class:
TypeErrorat subscribe time. Use a method on a long-lived object instead.functools.partial– same: not weakref-able. Define a small wrapper method on the receiver.
# WRONG -- closure cannot be weakref'd, raises TypeError
def on_ready(self):
bus = self.tree.events
bus.subscribe(PlayerDied, lambda evt: self._show_overlay(evt.player))
# RIGHT -- method on self
def on_ready(self):
self.tree.events.subscribe(PlayerDied, self._on_player_died)
def _on_player_died(self, evt: PlayerDied) -> None:
self._show_overlay(evt.player)
If the receiver genuinely needs partial application, hold the partial as
an instance attribute (self._handler = partial(...)), pass
self._handler, and rely on the receiver’s lifetime to keep the
partial alive. The bus will fail at subscribe time anyway, surfacing
the bug immediately.
Exact-class dispatch – no MRO walk¶
Events fan out to handlers registered for the exact dataclass type. There is no MRO walk:
@dataclass(frozen=True)
class EnemyDied:
enemy: Enemy
@dataclass(frozen=True)
class BossDefeated(EnemyDied):
"""Boss is also an enemy, but is its own event."""
boss: Boss
# Handlers ONLY hear their exact type:
bus.subscribe(EnemyDied, on_any_enemy_died) # NOT called for BossDefeated
bus.subscribe(BossDefeated, on_boss_defeated) # called for BossDefeated only
bus.publish(BossDefeated(enemy=b, boss=b)) # only on_boss_defeated runs
This rule is deliberate. Two reasons:
Predictable dispatch. No surprise calls when refactoring class hierarchies. Adding a new event subclass cannot retroactively change what existing handlers see.
O(1) per publish. No type-graph walk; one dict lookup per event.
If you genuinely need a “match any subclass” behaviour, publish both events explicitly from the producer:
def die(self) -> None:
self.tree.events.publish(BossDefeated(enemy=self, boss=self))
self.tree.events.publish(EnemyDied(enemy=self))
Synchronous vs deferred dispatch¶
bus.publish(event) # synchronous: handlers run before publish returns
bus.publish_deferred(event) # queued: handlers run on the next frame's flush
publish runs handlers in registration order on the calling thread,
then returns. Use it for events that must be observed before the next
line of the caller’s code.
publish_deferred queues the event. The :class:SceneTree calls
self.events.flush_deferred() once per process tick, before any
_process runs. Use it when:
You’re inside a
_physics_processand want UI to react on the next visual frame, not mid-physics.A handler might publish further events –
publish_deferredkeeps recursion shallow and avoids re-entering a handler before it has finished.Many producers publish during one frame and you want one batch of UI updates, not N.
Events queued during a flush are dispatched on the next flush, not the current one. This bounds re-entrancy.
Threading¶
The bus assumes a single-threaded engine. There are no locks. Calling
publish or subscribe from a worker thread is undefined behaviour
– use publish_deferred if you must, but only from the main thread;
cross the thread boundary with the engine’s own coroutine / task
plumbing first.
The events autoload reservation¶
The name events on a :class:SceneTree is reserved for the
EventBus. The autoload registry rejects any project that tries to
register an autoload under that name:
# This raises in project load:
# ProjectSettings.from_toml(...) → ValueError
[autoload]
events = "my_game.scripts.MyEventManager"
Likewise, no node attribute named events should shadow tree.events
when accessed via self.tree.events – because the bus is a property
on the tree, the path is always unambiguous. (A node named events is
unrelated and fine.)
The bus survives change_scene(): subscriptions held by autoloads or
other long-lived objects keep firing across scene swaps. Subscriptions
held by transient nodes drop automatically when those nodes are freed,
thanks to the weakref storage.
Worked example: Dungeon Explorer combat¶
Dungeon Explorer’s combat refactor uses the bus for everything that crosses node boundaries.
Define the events (games/dungeon_explorer/events.py):
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .nodes.boss_enemy import BossEnemy
from .nodes.player import Player
@dataclass(frozen=True)
class PlayerDied:
player: Player
@dataclass(frozen=True)
class PlayerLevelledUp:
player: Player
new_level: int
@dataclass(frozen=True)
class BossDefeated:
boss: BossEnemy
@dataclass(frozen=True)
class BossPhaseChanged:
boss: BossEnemy
new_phase: int
Producer (nodes/player.py excerpt):
class Player(CharacterBody2D):
def take_damage(self, amount: float) -> None:
if self.hp <= 0:
return
self.hp = max(0, self.hp - amount)
if self.hp == 0:
self.tree.events.publish(PlayerDied(player=self))
def gain_xp(self, amount: int) -> None:
self.xp += amount
while self.xp >= self._xp_for_next_level():
self.xp -= self._xp_for_next_level()
self.level += 1
self.tree.events.publish(PlayerLevelledUp(player=self, new_level=self.level))
Handler (nodes/player_hud.py excerpt):
class PlayerHUD(Control):
def on_ready(self) -> None:
bus = self.tree.events
bus.subscribe(PlayerDied, self._on_player_died)
bus.subscribe(PlayerLevelledUp, self._on_levelled_up)
bus.subscribe(BossDefeated, self._on_boss_defeated)
def _on_player_died(self, evt: PlayerDied) -> None:
self.show_death_overlay()
def _on_levelled_up(self, evt: PlayerLevelledUp) -> None:
self.flash_level_banner(evt.new_level)
def _on_boss_defeated(self, evt: BossDefeated) -> None:
self.show_victory_screen(evt.boss)
The HUD never imports Player or BossEnemy; it knows only the
event dataclasses. When the player or boss is freed, the bound-method
subscriptions drop without any explicit unsubscribe – the HUD’s own
__del__ is unnecessary.
See :doc:save_system for the natural pairing: publish a SaveSucceeded
or SaveFailed event from the save UI handler so unrelated subsystems
(achievement tracker, telemetry) can react without coupling to the save
button itself.