"""CPU-side particle emitter system with sub-emitters, collision, trails, and deterministic seeding."""
from __future__ import annotations
import logging
import numpy as np
from .nodes_3d.node3d import Node3D
from .descriptors import Property, Signal
log = logging.getLogger(__name__)
__all__ = ["ParticleEmitter", "PARTICLE_DTYPE"]
# Must match GLSL std430 layout — vec3 fields need 4-byte padding to align to 16 bytes.
PARTICLE_DTYPE = np.dtype(
[
("position", np.float32, 3),
("_pad0", np.float32), # std430 vec3 alignment
("velocity", np.float32, 3),
("_pad1", np.float32), # std430 vec3 alignment
("colour", np.float32, 4),
("scale", np.float32),
("lifetime", np.float32),
("age", np.float32),
("_pad", np.float32),
]
)
[docs]
class ParticleEmitter(Node3D):
"""CPU-driven particle emitter with sub-emitters, collision, trails, and deterministic seeding.
Emits particles that move, scale, and recolour over their lifetime.
The graphics backend reads ``particle_data`` each frame for billboard
rendering. Supports emission shapes (point, sphere, box), collision
with a ground plane, particle trails, and sub-emitters that fire on
birth or death events.
Attributes:
amount: Maximum number of live particles (pool size).
lifetime: How long each particle lives, in seconds.
emission_rate: Particles emitted per second.
one_shot: When ``True``, stops emitting after the pool is filled.
emitting: Whether the emitter is actively spawning particles.
gravity: Gravity vector applied to all particles each physics step.
collision_enabled: Enable ground-plane collision.
trail_enabled: Enable per-particle ribbon trails.
Example::
emitter = ParticleEmitter(amount=200, lifetime=1.5, emission_rate=80)
emitter.gravity = (0.0, -4.9, 0.0)
emitter.start_colour = (1.0, 0.8, 0.2, 1.0)
emitter.end_colour = (1.0, 0.0, 0.0, 0.0)
scene.add_child(emitter)
"""
amount = Property(100, group="Emission")
lifetime = Property(2.0, group="Emission")
emission_rate = Property(50.0, group="Emission")
one_shot = Property(False, group="Emission")
emitting = Property(True, group="Emission")
emission_shape = Property("point", group="Emission")
emission_radius = Property(1.0, group="Emission")
emission_box = Property((1.0, 1.0, 1.0), group="Emission")
initial_velocity = Property((0.0, 5.0, 0.0), group="Movement")
velocity_spread = Property(0.3, group="Movement")
gravity = Property((0.0, -9.8, 0.0), group="Movement")
damping = Property(0.0, group="Movement")
start_scale = Property(1.0, group="Appearance")
end_scale = Property(0.0, group="Appearance")
start_colour = Property((1.0, 1.0, 1.0, 1.0), group="Appearance")
end_colour = Property((1.0, 1.0, 1.0, 0.0), group="Appearance")
gpu_simulation = Property(False)
collision_enabled = Property(False, group="Collision")
collision_bounce = Property(0.5, group="Collision")
collision_friction = Property(0.2, group="Collision")
collision_mode = Property("bounce", group="Collision")
collision_plane_y = Property(0.0, group="Collision")
trail_enabled = Property(False, group="Trail")
trail_length = Property(5, group="Trail")
trail_width = Property(0.1, group="Trail")
particle_born = Signal()
particle_died = Signal()
def __init__(self, seed: int | None = None, **kwargs):
super().__init__(**kwargs)
self._particles: np.ndarray | None = None
self._alive_count: int = 0
self._emit_accum: float = 0.0
self._seed = seed
self._rng = np.random.default_rng(seed)
self.sub_emitter_birth: ParticleEmitter | None = None
self.sub_emitter_death: ParticleEmitter | None = None
self._trail = None
# Cached numpy arrays to avoid per-frame allocations
self._grav_cache: np.ndarray | None = None
self._sc_cache: np.ndarray | None = None
self._ec_cache: np.ndarray | None = None
self._eb_cache: np.ndarray | None = None
self._iv_cache: np.ndarray | None = None
self._grav_src: tuple | None = None
self._sc_src: tuple | None = None
self._ec_src: tuple | None = None
self._eb_src: tuple | None = None
self._iv_src: tuple | None = None
self._pos_cache = np.zeros(3, dtype=np.float32)
self._dir_buf = np.zeros(3, dtype=np.float32)
@property
def seed(self) -> int | None:
return self._seed
@seed.setter
def seed(self, value: int | None):
self._seed = value
self._rng = np.random.default_rng(value)
@property
def emitter_config(self) -> dict:
return {
"emitter_pos": tuple(self.world_position),
"gravity": tuple(self.gravity) if hasattr(self.gravity, "__iter__") else (0, self.gravity, 0),
"damping": float(self.damping),
"initial_velocity": tuple(self.initial_velocity),
"velocity_spread": float(self.velocity_spread),
"start_colour": tuple(self.start_colour),
"end_colour": tuple(self.end_colour),
"start_scale": float(self.start_scale),
"end_scale": float(self.end_scale),
"emission_radius": float(self.emission_radius),
"max_particles": int(self.amount),
}
def _sync_caches(self):
"""Rebuild cached numpy arrays only when the underlying Property values change."""
grav = self.gravity
if grav != self._grav_src:
self._grav_cache = np.array(grav, dtype=np.float32)
self._grav_src = grav
sc = self.start_colour
if sc != self._sc_src:
self._sc_cache = np.array(sc, dtype=np.float32)
self._sc_src = sc
ec = self.end_colour
if ec != self._ec_src:
self._ec_cache = np.array(ec, dtype=np.float32)
self._ec_src = ec
eb = self.emission_box
if eb != self._eb_src:
self._eb_cache = np.array(eb, dtype=np.float32)
self._eb_src = eb
iv = self.initial_velocity
if iv != self._iv_src:
self._iv_cache = np.array(iv, dtype=np.float32)
self._iv_src = iv
[docs]
def ready(self):
lt = float(self.lifetime)
rate = float(self.emission_rate)
if lt <= 0:
log.warning("%s: lifetime is zero or negative (%.4f), particles expire immediately", self.name, lt)
if rate <= 0:
log.warning("%s: emission_rate is zero or negative (%.4f), no particles will spawn", self.name, rate)
max_p = int(self.amount)
self._particles = np.zeros(max_p, dtype=PARTICLE_DTYPE)
self._alive_count = 0
self._sync_caches()
if self.trail_enabled:
from .particle_trail import ParticleTrailData
self._trail = ParticleTrailData(max_p, int(self.trail_length), float(self.trail_width))
log.debug("%s: emitter ready, pool=%d, rate=%.1f/s, lifetime=%.2fs", self.name, max_p, rate, lt)
[docs]
def restart(self):
self._rng = np.random.default_rng(self._seed)
self._alive_count = 0
self._emit_accum = 0.0
if self._particles is not None:
self._particles["age"] = 0.0
if self._trail is not None:
self._trail.clear()
[docs]
def physics_process(self, dt: float):
if self._particles is None or self.gpu_simulation:
return
self._sync_caches()
self._pos_cache[:] = self.world_position
max_p = len(self._particles)
born_positions: list[np.ndarray] = []
if self.emitting:
self._emit_accum += float(self.emission_rate) * dt
while self._emit_accum >= 1.0 and self._alive_count < max_p:
pos = self._emit_one()
born_positions.append(pos)
self._emit_accum -= 1.0
if self.one_shot and self._alive_count >= max_p:
self.emitting = False
log.debug("%s: one-shot pool full, emitter stopped (alive=%d)", self.name, self._alive_count)
break
if self._alive_count == 0:
return
if born_positions and self.sub_emitter_birth is not None:
for bp in born_positions:
self._fire_sub_emitter(self.sub_emitter_birth, bp)
if born_positions and self.particle_born._callbacks:
self.particle_born.emit(born_positions)
alive = self._particles[: self._alive_count]
dt_f = np.float32(dt)
alive["age"] += dt_f
expired = alive["age"] >= alive["lifetime"]
death_positions: list[np.ndarray] | None = None
if expired.any():
if self.sub_emitter_death is not None or self.particle_died._callbacks:
death_positions = [alive["position"][i].copy() for i in np.where(expired)[0]]
live_mask = ~expired
live_count = int(live_mask.sum())
expired_count = self._alive_count - live_count
self._particles[:live_count] = alive[live_mask]
self._alive_count = live_count
alive = self._particles[: self._alive_count]
log.debug("%s: %d particles expired, alive=%d", self.name, expired_count, self._alive_count)
if self._alive_count == 0:
self._fire_death_events(death_positions)
return
self._fire_death_events(death_positions)
alive["velocity"] += self._grav_cache * dt_f
damp = float(self.damping)
if damp > 0:
alive["velocity"] *= max(0.0, 1.0 - damp * dt)
alive["position"] += alive["velocity"] * dt_f
if self.collision_enabled:
self._process_collisions(alive)
t = np.clip(alive["age"] / alive["lifetime"], 0, 1)
alive["colour"] = self._sc_cache + (self._ec_cache - self._sc_cache) * t[:, np.newaxis]
alive["scale"] = float(self.start_scale) + (float(self.end_scale) - float(self.start_scale)) * t
if self._trail is not None:
self._trail.update(alive)
def _process_collisions(self, alive: np.ndarray):
plane_y = float(self.collision_plane_y)
mode = str(self.collision_mode)
below = alive["position"][:, 1] < plane_y
if not below.any():
return
if mode == "destroy":
alive["age"][below] = alive["lifetime"][below] + 1.0
elif mode == "stick":
alive["position"][below, 1] = plane_y
alive["velocity"][below] = 0.0
else:
alive["position"][below, 1] = plane_y + (plane_y - alive["position"][below, 1])
alive["velocity"][below, 1] *= -float(self.collision_bounce)
alive["velocity"][below, 0] *= 1.0 - float(self.collision_friction)
alive["velocity"][below, 2] *= 1.0 - float(self.collision_friction)
def _fire_death_events(self, death_positions: list[np.ndarray] | None):
if death_positions is None:
return
if self.sub_emitter_death is not None:
for dp in death_positions:
self._fire_sub_emitter(self.sub_emitter_death, dp)
if self.particle_died._callbacks:
self.particle_died.emit(death_positions)
@staticmethod
def _fire_sub_emitter(sub: ParticleEmitter, position: np.ndarray):
if sub._particles is None:
return
sub._sync_caches()
max_p = len(sub._particles)
burst = min(int(sub.emission_rate), max_p - sub._alive_count)
for _ in range(burst):
if sub._alive_count >= max_p:
break
p = sub._particles[sub._alive_count]
pos = position.copy()
shape = str(sub.emission_shape)
if shape == "sphere":
r = float(sub.emission_radius)
sub._dir_buf[:] = sub._rng.standard_normal(3)
sub._dir_buf /= max(np.linalg.norm(sub._dir_buf), 1e-6)
pos += sub._dir_buf * sub._rng.uniform(0, r)
elif shape == "box":
pos += (sub._rng.random(3).astype(np.float32) - 0.5) * sub._eb_cache
p["position"] = pos
vel = sub._iv_cache.copy()
spread = float(sub.velocity_spread)
if spread > 0:
vel += sub._rng.standard_normal(3).astype(np.float32) * spread * max(np.linalg.norm(vel), 1e-6)
p["velocity"] = vel
p["colour"] = sub.start_colour
p["scale"] = float(sub.start_scale)
p["lifetime"] = float(sub.lifetime)
p["age"] = 0.0
sub._alive_count += 1
def _emit_one(self) -> np.ndarray:
p = self._particles[self._alive_count]
pos = self._pos_cache.copy()
shape = str(self.emission_shape)
if shape == "sphere":
r = float(self.emission_radius)
self._dir_buf[:] = self._rng.standard_normal(3)
self._dir_buf /= max(np.linalg.norm(self._dir_buf), 1e-6)
pos += self._dir_buf * self._rng.uniform(0, r)
elif shape == "box":
pos += (self._rng.random(3).astype(np.float32) - 0.5) * self._eb_cache
p["position"] = pos
vel = self._iv_cache.copy()
spread = float(self.velocity_spread)
if spread > 0:
vel += self._rng.standard_normal(3).astype(np.float32) * spread * np.linalg.norm(vel)
p["velocity"] = vel
p["colour"] = self.start_colour
p["scale"] = float(self.start_scale)
p["lifetime"] = float(self.lifetime)
p["age"] = 0.0
self._alive_count += 1
return pos.copy()
@property
def alive_count(self) -> int:
return self._alive_count
@property
def particle_data(self) -> np.ndarray | None:
if self._particles is None or self._alive_count == 0:
return None
return self._particles[: self._alive_count]
@property
def trail_data(self):
return self._trail
@property
def trail_vertices(self) -> np.ndarray | None:
if self._trail is None:
return None
return self._trail.get_trail_vertices(self._particles[: self._alive_count] if self._alive_count > 0 else None)