"""Signal and Connection classes: observable event dispatcher.
Leaf module with no engine dependencies. Used by Property descriptors,
Selection, UndoStack, EventBus, and any node-attached signal.
"""
import inspect
import logging
import weakref
from collections.abc import Callable
log = logging.getLogger(__name__)
[docs]
class Connection:
"""Handle returned by ``Signal.connect()``. Can disconnect and acts as a callable proxy.
For bound methods on Nodes (objects exposing ``_outgoing_connections``), the
callback is held weakly and the connection auto-cleans when the node is
destroyed: matching Godot 4's signal lifecycle. Other callables (lambdas,
free functions, methods on non-Node objects) keep a strong reference.
"""
__slots__ = ('_signal', '_fn', '_weak', '_connected')
def __init__(self, signal: Signal, fn: Callable):
self._signal = signal
owner = getattr(fn, '__self__', None) if inspect.ismethod(fn) else None
if owner is not None and hasattr(owner, '_outgoing_connections'):
try:
self._weak = weakref.WeakMethod(fn)
self._fn = None
owner._outgoing_connections.append(self)
except TypeError:
self._weak = None
self._fn = fn
else:
self._weak = None
self._fn = fn
self._connected = True
[docs]
def disconnect(self):
"""Disconnect this callback from the signal."""
if self._connected:
self._signal._callbacks[:] = [c for c in self._signal._callbacks if c is not self]
self._connected = False
[docs]
@property
def connected(self) -> bool:
return self._connected
[docs]
def __call__(self, *args, **kwargs):
if self._weak is not None:
method = self._weak()
if method is None:
self._connected = False
return None
return method(*args, **kwargs)
return self._fn(*args, **kwargs)
def _is_alive(self) -> bool:
"""True if the callback can still be invoked (used by Signal pruning)."""
if not self._connected:
return False
if self._weak is not None and self._weak() is None:
return False
return True
[docs]
def __bool__(self):
return True
[docs]
def __repr__(self):
target = self._fn if self._weak is None else self._weak()
return f"Connection({target!r}, connected={self._connected})"
[docs]
class Signal:
"""Observable event dispatcher with optional type metadata.
Used as a class attribute, ``Signal`` is a non-data descriptor: each
instance accessing it lazily gets its own Signal copy stored in
``obj.__dict__``. Class-level access (``Cls.signal_name``) returns the
shared signal: useful for global event hubs like
``Node.script_error_raised``.
Example::
class Player(Node):
health_changed = Signal(int) # typed: emits one int
p1, p2 = Player(), Player()
p1.health_changed.connect(on_p1) # instance-scoped
p1.health_changed(50) # only p1 listeners fire
"""
__slots__ = ('_callbacks', '_types', '_name')
def __init__(self, *types: type):
self._callbacks: list[Connection] = []
self._types: tuple[type, ...] = types
self._name: str | None = None
[docs]
def __class_getitem__(cls, params) -> Signal:
"""Bracket syntax: ``Signal[int]``, ``Signal[int, str]``."""
if not isinstance(params, tuple):
params = (params,)
sig = cls.__new__(cls)
sig._callbacks = []
sig._types = params
sig._name = None
return sig
[docs]
def __set_name__(self, owner, name):
self._name = name
[docs]
def __get__(self, obj, objtype=None):
if obj is None or self._name is None:
return self
sig = obj.__dict__.get(self._name)
if sig is None:
sig = Signal.__new__(Signal)
sig._callbacks = []
sig._types = self._types
sig._name = self._name
obj.__dict__[self._name] = sig
return sig
def _validate_arity(self, fn: Callable) -> None:
"""Warn if *fn* cannot accept the number of args this signal emits."""
try:
sig = inspect.signature(fn)
params = sig.parameters.values()
has_var_positional = any(p.kind == p.VAR_POSITIONAL for p in params)
if has_var_positional:
return
max_params = sum(
1 for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
)
n_types = len(self._types)
if max_params < n_types:
type_names = ", ".join(t.__name__ if isinstance(t, type) else repr(t) for t in self._types)
log.warning(
"Signal(%s) connected to %r which accepts at most %d args (signal emits %d)",
type_names, fn, max_params, n_types,
)
except (ValueError, TypeError):
pass # Can't inspect (builtin, etc.): skip validation
[docs]
def connect(self, fn: Callable, *, once: bool = False) -> Connection:
"""Subscribe a callback. Returns a Connection handle.
Args:
fn: Callback to invoke on emit.
once: If True, auto-disconnect after first emit.
"""
if self._types:
self._validate_arity(fn)
if once:
original_fn = fn
conn_ref: list[Connection] = []
def _once_wrapper(*args, **kwargs):
original_fn(*args, **kwargs)
if conn_ref:
conn_ref[0].disconnect()
conn = Connection(self, _once_wrapper)
conn_ref.append(conn)
else:
conn = Connection(self, fn)
self._callbacks.append(conn)
return conn
[docs]
def disconnect(self, fn_or_conn):
"""Remove a previously connected callback or Connection."""
if isinstance(fn_or_conn, Connection):
fn_or_conn.disconnect()
return
# Match by stored fn for strong-ref connections, or by deref'd bound
# method for weak ones. Use == so bound methods compare correctly
# (bound methods create a new object on each attribute access).
def _target(c: Connection):
return c._weak() if c._weak is not None else c._fn
self._callbacks[:] = [c for c in self._callbacks if _target(c) != fn_or_conn]
[docs]
def __call__(self, *args, **kwargs):
"""Emit the signal, calling all connected callbacks with the given arguments.
Connections whose weak target was garbage-collected are skipped and
pruned from ``_callbacks`` after the dispatch loop. A callback that
disconnects another callback mid-emit only marks the peer
``_connected=False``: the live list still holds it, so an
unconditional rebuild after dispatch keeps the invariant that
``_callbacks`` only ever contains connected entries.
"""
snapshot = self._callbacks[:]
for cb in snapshot:
if not cb._connected:
continue
if cb._weak is not None and cb._weak() is None:
cb._connected = False
continue
cb(*args, **kwargs)
self._callbacks[:] = [c for c in self._callbacks if c._connected]
[docs]
def clear(self):
"""Remove all connected callbacks."""
self._callbacks.clear()
[docs]
def disconnect_from_module(self, module_name: str) -> int:
"""Drop all connections whose callback is defined in ``module_name``.
Used by the hot-reload system to release references to old-module
code (lambdas, free functions, methods) before reload, so the old
module can be garbage-collected.
Returns the number of connections dropped.
"""
def _module_of(c: Connection) -> str | None:
target = c._weak() if c._weak is not None else c._fn
return getattr(target, '__module__', None) if target is not None else None
before = len(self._callbacks)
self._callbacks = [c for c in self._callbacks if _module_of(c) != module_name]
return before - len(self._callbacks)
emit = __call__
[docs]
def __repr__(self):
if self._types:
type_names = ", ".join(t.__name__ if isinstance(t, type) else repr(t) for t in self._types)
return f"Signal({type_names}, connections={len(self._callbacks)})"
return f"Signal(connections={len(self._callbacks)})"