Source code for simvx.core.particles

"""CPU-side particle emitter system with sub-emitters, collision, trails, and deterministic seeding."""


from __future__ import annotations

import logging

import numpy as np

from .nodes_3d.node3d import Node3D
from .descriptors import Property, Signal

log = logging.getLogger(__name__)

__all__ = ["ParticleEmitter", "PARTICLE_DTYPE"]

# Must match GLSL std430 layout — vec3 fields need 4-byte padding to align to 16 bytes.
PARTICLE_DTYPE = np.dtype(
    [
        ("position", np.float32, 3),
        ("_pad0", np.float32),  # std430 vec3 alignment
        ("velocity", np.float32, 3),
        ("_pad1", np.float32),  # std430 vec3 alignment
        ("colour", np.float32, 4),
        ("scale", np.float32),
        ("lifetime", np.float32),
        ("age", np.float32),
        ("_pad", np.float32),
    ]
)


[docs] class ParticleEmitter(Node3D): """CPU-driven particle emitter with sub-emitters, collision, trails, and deterministic seeding. Emits particles that move, scale, and recolour over their lifetime. The graphics backend reads ``particle_data`` each frame for billboard rendering. Supports emission shapes (point, sphere, box), collision with a ground plane, particle trails, and sub-emitters that fire on birth or death events. Attributes: amount: Maximum number of live particles (pool size). lifetime: How long each particle lives, in seconds. emission_rate: Particles emitted per second. one_shot: When ``True``, stops emitting after the pool is filled. emitting: Whether the emitter is actively spawning particles. gravity: Gravity vector applied to all particles each physics step. collision_enabled: Enable ground-plane collision. trail_enabled: Enable per-particle ribbon trails. Example:: emitter = ParticleEmitter(amount=200, lifetime=1.5, emission_rate=80) emitter.gravity = (0.0, -4.9, 0.0) emitter.start_colour = (1.0, 0.8, 0.2, 1.0) emitter.end_colour = (1.0, 0.0, 0.0, 0.0) scene.add_child(emitter) """ amount = Property(100, group="Emission") lifetime = Property(2.0, group="Emission") emission_rate = Property(50.0, group="Emission") one_shot = Property(False, group="Emission") emitting = Property(True, group="Emission") emission_shape = Property("point", group="Emission") emission_radius = Property(1.0, group="Emission") emission_box = Property((1.0, 1.0, 1.0), group="Emission") initial_velocity = Property((0.0, 5.0, 0.0), group="Movement") velocity_spread = Property(0.3, group="Movement") gravity = Property((0.0, -9.8, 0.0), group="Movement") damping = Property(0.0, group="Movement") start_scale = Property(1.0, group="Appearance") end_scale = Property(0.0, group="Appearance") start_colour = Property((1.0, 1.0, 1.0, 1.0), group="Appearance") end_colour = Property((1.0, 1.0, 1.0, 0.0), group="Appearance") gpu_simulation = Property(False) collision_enabled = Property(False, group="Collision") collision_bounce = Property(0.5, group="Collision") collision_friction = Property(0.2, group="Collision") collision_mode = Property("bounce", group="Collision") collision_plane_y = Property(0.0, group="Collision") trail_enabled = Property(False, group="Trail") trail_length = Property(5, group="Trail") trail_width = Property(0.1, group="Trail") particle_born = Signal() particle_died = Signal() def __init__(self, seed: int | None = None, **kwargs): super().__init__(**kwargs) self._particles: np.ndarray | None = None self._alive_count: int = 0 self._emit_accum: float = 0.0 self._seed = seed self._rng = np.random.default_rng(seed) self.sub_emitter_birth: ParticleEmitter | None = None self.sub_emitter_death: ParticleEmitter | None = None self._trail = None # Cached numpy arrays to avoid per-frame allocations self._grav_cache: np.ndarray | None = None self._sc_cache: np.ndarray | None = None self._ec_cache: np.ndarray | None = None self._eb_cache: np.ndarray | None = None self._iv_cache: np.ndarray | None = None self._grav_src: tuple | None = None self._sc_src: tuple | None = None self._ec_src: tuple | None = None self._eb_src: tuple | None = None self._iv_src: tuple | None = None self._pos_cache = np.zeros(3, dtype=np.float32) self._dir_buf = np.zeros(3, dtype=np.float32) @property def seed(self) -> int | None: return self._seed @seed.setter def seed(self, value: int | None): self._seed = value self._rng = np.random.default_rng(value) @property def emitter_config(self) -> dict: return { "emitter_pos": tuple(self.world_position), "gravity": tuple(self.gravity) if hasattr(self.gravity, "__iter__") else (0, self.gravity, 0), "damping": float(self.damping), "initial_velocity": tuple(self.initial_velocity), "velocity_spread": float(self.velocity_spread), "start_colour": tuple(self.start_colour), "end_colour": tuple(self.end_colour), "start_scale": float(self.start_scale), "end_scale": float(self.end_scale), "emission_radius": float(self.emission_radius), "max_particles": int(self.amount), } def _sync_caches(self): """Rebuild cached numpy arrays only when the underlying Property values change.""" grav = self.gravity if grav != self._grav_src: self._grav_cache = np.array(grav, dtype=np.float32) self._grav_src = grav sc = self.start_colour if sc != self._sc_src: self._sc_cache = np.array(sc, dtype=np.float32) self._sc_src = sc ec = self.end_colour if ec != self._ec_src: self._ec_cache = np.array(ec, dtype=np.float32) self._ec_src = ec eb = self.emission_box if eb != self._eb_src: self._eb_cache = np.array(eb, dtype=np.float32) self._eb_src = eb iv = self.initial_velocity if iv != self._iv_src: self._iv_cache = np.array(iv, dtype=np.float32) self._iv_src = iv
[docs] def ready(self): lt = float(self.lifetime) rate = float(self.emission_rate) if lt <= 0: log.warning("%s: lifetime is zero or negative (%.4f), particles expire immediately", self.name, lt) if rate <= 0: log.warning("%s: emission_rate is zero or negative (%.4f), no particles will spawn", self.name, rate) max_p = int(self.amount) self._particles = np.zeros(max_p, dtype=PARTICLE_DTYPE) self._alive_count = 0 self._sync_caches() if self.trail_enabled: from .particle_trail import ParticleTrailData self._trail = ParticleTrailData(max_p, int(self.trail_length), float(self.trail_width)) log.debug("%s: emitter ready, pool=%d, rate=%.1f/s, lifetime=%.2fs", self.name, max_p, rate, lt)
[docs] def restart(self): self._rng = np.random.default_rng(self._seed) self._alive_count = 0 self._emit_accum = 0.0 if self._particles is not None: self._particles["age"] = 0.0 if self._trail is not None: self._trail.clear()
[docs] def physics_process(self, dt: float): if self._particles is None or self.gpu_simulation: return self._sync_caches() self._pos_cache[:] = self.world_position max_p = len(self._particles) born_positions: list[np.ndarray] = [] if self.emitting: self._emit_accum += float(self.emission_rate) * dt while self._emit_accum >= 1.0 and self._alive_count < max_p: pos = self._emit_one() born_positions.append(pos) self._emit_accum -= 1.0 if self.one_shot and self._alive_count >= max_p: self.emitting = False log.debug("%s: one-shot pool full, emitter stopped (alive=%d)", self.name, self._alive_count) break if self._alive_count == 0: return if born_positions and self.sub_emitter_birth is not None: for bp in born_positions: self._fire_sub_emitter(self.sub_emitter_birth, bp) if born_positions and self.particle_born._callbacks: self.particle_born.emit(born_positions) alive = self._particles[: self._alive_count] dt_f = np.float32(dt) alive["age"] += dt_f expired = alive["age"] >= alive["lifetime"] death_positions: list[np.ndarray] | None = None if expired.any(): if self.sub_emitter_death is not None or self.particle_died._callbacks: death_positions = [alive["position"][i].copy() for i in np.where(expired)[0]] live_mask = ~expired live_count = int(live_mask.sum()) expired_count = self._alive_count - live_count self._particles[:live_count] = alive[live_mask] self._alive_count = live_count alive = self._particles[: self._alive_count] log.debug("%s: %d particles expired, alive=%d", self.name, expired_count, self._alive_count) if self._alive_count == 0: self._fire_death_events(death_positions) return self._fire_death_events(death_positions) alive["velocity"] += self._grav_cache * dt_f damp = float(self.damping) if damp > 0: alive["velocity"] *= max(0.0, 1.0 - damp * dt) alive["position"] += alive["velocity"] * dt_f if self.collision_enabled: self._process_collisions(alive) t = np.clip(alive["age"] / alive["lifetime"], 0, 1) alive["colour"] = self._sc_cache + (self._ec_cache - self._sc_cache) * t[:, np.newaxis] alive["scale"] = float(self.start_scale) + (float(self.end_scale) - float(self.start_scale)) * t if self._trail is not None: self._trail.update(alive)
def _process_collisions(self, alive: np.ndarray): plane_y = float(self.collision_plane_y) mode = str(self.collision_mode) below = alive["position"][:, 1] < plane_y if not below.any(): return if mode == "destroy": alive["age"][below] = alive["lifetime"][below] + 1.0 elif mode == "stick": alive["position"][below, 1] = plane_y alive["velocity"][below] = 0.0 else: alive["position"][below, 1] = plane_y + (plane_y - alive["position"][below, 1]) alive["velocity"][below, 1] *= -float(self.collision_bounce) alive["velocity"][below, 0] *= 1.0 - float(self.collision_friction) alive["velocity"][below, 2] *= 1.0 - float(self.collision_friction) def _fire_death_events(self, death_positions: list[np.ndarray] | None): if death_positions is None: return if self.sub_emitter_death is not None: for dp in death_positions: self._fire_sub_emitter(self.sub_emitter_death, dp) if self.particle_died._callbacks: self.particle_died.emit(death_positions) @staticmethod def _fire_sub_emitter(sub: ParticleEmitter, position: np.ndarray): if sub._particles is None: return sub._sync_caches() max_p = len(sub._particles) burst = min(int(sub.emission_rate), max_p - sub._alive_count) for _ in range(burst): if sub._alive_count >= max_p: break p = sub._particles[sub._alive_count] pos = position.copy() shape = str(sub.emission_shape) if shape == "sphere": r = float(sub.emission_radius) sub._dir_buf[:] = sub._rng.standard_normal(3) sub._dir_buf /= max(np.linalg.norm(sub._dir_buf), 1e-6) pos += sub._dir_buf * sub._rng.uniform(0, r) elif shape == "box": pos += (sub._rng.random(3).astype(np.float32) - 0.5) * sub._eb_cache p["position"] = pos vel = sub._iv_cache.copy() spread = float(sub.velocity_spread) if spread > 0: vel += sub._rng.standard_normal(3).astype(np.float32) * spread * max(np.linalg.norm(vel), 1e-6) p["velocity"] = vel p["colour"] = sub.start_colour p["scale"] = float(sub.start_scale) p["lifetime"] = float(sub.lifetime) p["age"] = 0.0 sub._alive_count += 1 def _emit_one(self) -> np.ndarray: p = self._particles[self._alive_count] pos = self._pos_cache.copy() shape = str(self.emission_shape) if shape == "sphere": r = float(self.emission_radius) self._dir_buf[:] = self._rng.standard_normal(3) self._dir_buf /= max(np.linalg.norm(self._dir_buf), 1e-6) pos += self._dir_buf * self._rng.uniform(0, r) elif shape == "box": pos += (self._rng.random(3).astype(np.float32) - 0.5) * self._eb_cache p["position"] = pos vel = self._iv_cache.copy() spread = float(self.velocity_spread) if spread > 0: vel += self._rng.standard_normal(3).astype(np.float32) * spread * np.linalg.norm(vel) p["velocity"] = vel p["colour"] = self.start_colour p["scale"] = float(self.start_scale) p["lifetime"] = float(self.lifetime) p["age"] = 0.0 self._alive_count += 1 return pos.copy() @property def alive_count(self) -> int: return self._alive_count @property def particle_data(self) -> np.ndarray | None: if self._particles is None or self._alive_count == 0: return None return self._particles[: self._alive_count] @property def trail_data(self): return self._trail @property def trail_vertices(self) -> np.ndarray | None: if self._trail is None: return None return self._trail.get_trail_vertices(self._particles[: self._alive_count] if self._alive_count > 0 else None)