"""GPU compute shader particle simulation.
Dispatches a compute shader to update particle positions, velocities, lifetimes,
and visual properties entirely on the GPU: avoiding per-frame CPU-to-GPU uploads
for particle data.
Mirrors the web backend's ``GPUParticlePass`` (``gpu_particle_pass.js``): each
``GPUParticles3D`` / ``GPUParticles2D`` emitter owns a persistent SSBO keyed by
a stable ``emitter_id`` (the truncated ``id(node)`` minted in
``scene_adapter.py``). Multiple emitters in the same scene render
independently; an emitter that leaves the tree has its SSBO and descriptor
pool released via :meth:`prune_inactive`.
"""
import logging
from dataclasses import dataclass, field
from typing import Any
import numpy as np
import vulkan as vk
from ..gpu.descriptors import (
DescriptorWriteBatch,
allocate_descriptor_set,
create_descriptor_set_layout,
create_pool_for_types,
)
from ..gpu.memory import create_buffer, upload_numpy
from ..gpu.pipeline_compute import create_compute_pipeline
__all__ = ["ParticleCompute"]
log = logging.getLogger(__name__)
# Must match PARTICLE_DTYPE from core/particles.py (16 floats x 4 bytes = 64 bytes)
_PARTICLE_GPU_STRIDE = 16 * 4
_WORKGROUP_SIZE = 256
# VK_WHOLE_SIZE as unsigned uint64: the vulkan Python package exposes it as -1 (signed),
# which triggers OverflowError when assigned to a cffi unsigned field.
_VK_WHOLE_SIZE_U64 = 0xFFFFFFFFFFFFFFFF
# Push constant layout (must match particle_sim.comp):
# vec3 emitter_pos (12) + float dt (4) = 16
# vec3 gravity (12) + float damping (4) = 16
# vec3 initial_velocity (12) + float vel_spread (4) = 16
# vec4 start_colour (16) = 16
# vec4 end_colour (16) = 16
# float start_scale (4) + float end_scale (4)
# + float emission_radius(4) + uint max_particles (4) = 16
# uint frame_seed (4) + 3x uint pad (12) = 16
# Total = 112 bytes
_PUSH_CONSTANT_SIZE = 112
@dataclass
class _EmitterSlot:
"""Per-emitter GPU resources owned by :class:`ParticleCompute`.
Each ``GPUParticles*`` node maps to one slot. The compute SSBO is bound
to ``compute_desc_set`` for the compute pass and to ``graphics_desc_set``
for the billboard render pass (two pools because the layouts differ).
Frame counter is per-slot so each emitter's PCG seed advances
independently: same convention as the web ``GPUParticlePass``.
"""
max_particles: int
particle_buf: Any
particle_mem: Any
compute_pool: Any
compute_desc_set: Any
graphics_pool: Any = None
graphics_desc_set: Any = None
frame_counter: int = 0
# Set each frame ``dispatch()`` is called; ``render()`` iterates only the
# slots that were touched this frame (an emitter may stop emitting
# without being removed from the tree).
last_active_frame: int = field(default=-1)
[docs]
class ParticleCompute:
"""GPU-based particle simulation via Vulkan compute shader.
Owns one compute pipeline (shared across emitters) and N per-emitter
SSBOs. Mirrors the web ``GPUParticlePass`` lifecycle: ``dispatch()``
lazy-creates a slot for an unseen ``emitter_id``; ``prune_inactive()``
releases slots whose ids are no longer in the scene tree.
"""
def __init__(self, engine: Any):
self._engine = engine
# Shared pipeline state: created once.
self._compute_pipeline: Any = None
self._compute_layout: Any = None
self._compute_module: Any = None
self._desc_layout: Any = None
self._ready = False
# Per-emitter resources, keyed by stable emitter_id (id(node) & 0xFFFFFFFF).
self._emitters: dict[int, _EmitterSlot] = {}
# Monotonic frame counter: bumped each ``begin_frame``; ``dispatch``
# stamps the slot's ``last_active_frame`` so ``active_slots()`` knows
# which emitters to render this frame.
self._frame_index = 0
[docs]
def setup(self) -> None:
"""Create the shared compute pipeline.
Per-emitter SSBOs are allocated lazily on first ``dispatch()`` for
each unique ``emitter_id``.
"""
e = self._engine
device = e.ctx.device
# Descriptor set layout: single SSBO binding for compute.
self._desc_layout = create_descriptor_set_layout(device, [
(0, vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, vk.VK_SHADER_STAGE_COMPUTE_BIT, 1),
])
self._compute_pipeline, self._compute_layout, self._compute_module = create_compute_pipeline(
device, e.shader_dir / "particle_sim.comp", [self._desc_layout], _PUSH_CONSTANT_SIZE,
)
self._ready = True
log.debug("Particle compute initialized (shared pipeline ready)")
# ------------------------------------------------------------------ slots
def _ensure_slot(self, emitter_id: int, max_particles: int) -> _EmitterSlot:
"""Return the slot for ``emitter_id``, creating or resizing as needed.
If ``max_particles`` exceeds the current capacity the SSBO is
rebuilt: the graphics descriptor set is invalidated so it gets
rewritten on next ``render()``.
"""
slot = self._emitters.get(emitter_id)
if slot is not None and slot.max_particles >= max_particles:
return slot
# Old slot (if any) goes: its capacity is too small. Free its
# resources before replacing so we don't leak the previous SSBO.
if slot is not None:
self._destroy_slot(slot)
e = self._engine
device = e.ctx.device
phys = e.ctx.physical_device
buf_size = max_particles * _PARTICLE_GPU_STRIDE
particle_buf, particle_mem = create_buffer(
device,
phys,
buf_size,
vk.VK_BUFFER_USAGE_STORAGE_BUFFER_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
# One descriptor pool per emitter so we can release the slot's GPU
# state by destroying the pool: no need for FREE_DESCRIPTOR_SET_BIT
# or per-set bookkeeping. Matches the web pass's per-emitter bind
# group ownership.
compute_pool = create_pool_for_types(
device, {vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER: 1},
)
compute_desc_set = allocate_descriptor_set(device, compute_pool, self._desc_layout)
with DescriptorWriteBatch(device) as batch:
batch.ssbo(compute_desc_set, 0, particle_buf, buf_size)
# Seed the SSBO: lifetime=1.0, age=999.0 → every particle is "dead"
# and respawns on the very first dispatch. Same convention as the web
# pass and the original single-emitter setup() code path.
seed = np.zeros(max_particles * 16, dtype=np.float32)
view = seed.reshape(max_particles, 16)
view[:, 13] = 1.0
view[:, 14] = 999.0
upload_numpy(device, particle_mem, seed)
slot = _EmitterSlot(
max_particles=max_particles,
particle_buf=particle_buf,
particle_mem=particle_mem,
compute_pool=compute_pool,
compute_desc_set=compute_desc_set,
)
self._emitters[emitter_id] = slot
log.debug("ParticleCompute: allocated slot id=%d max=%d", emitter_id, max_particles)
return slot
[docs]
def begin_frame(self) -> None:
"""Bump the per-frame stamp used to track active emitters."""
self._frame_index += 1
[docs]
def prune_inactive(self, active_ids: set[int]) -> None:
"""Free GPU resources for emitters not in ``active_ids``.
Called once per frame after every emitter has been submitted, so
slots whose ``GPUParticles*`` node has left the tree get cleaned up.
Mirrors :func:`GPUParticlePass.pruneInactive` on the web side.
"""
for eid in list(self._emitters.keys()):
if eid in active_ids:
continue
self._destroy_slot(self._emitters.pop(eid))
# ------------------------------------------------------------------ dispatch
[docs]
def dispatch(self, cmd: Any, dt: float, emitter_id: int, emitter_config: dict) -> None:
"""Dispatch the compute shader for one emitter.
Args:
cmd: Active command buffer (must be outside a render pass).
dt: Delta time in seconds.
emitter_id: Stable per-node identifier (``id(node) & 0xFFFFFFFF``).
emitter_config: See :meth:`GPUParticles3D.emitter_config`.
"""
if not self._ready:
return
max_particles = int(emitter_config.get("max_particles", 1024))
if max_particles <= 0:
return
slot = self._ensure_slot(emitter_id, max_particles)
slot.frame_counter += 1
slot.last_active_frame = self._frame_index
# Build push constants (112 bytes = 28 floats/uints)
pc = np.zeros(28, dtype=np.float32)
pos = emitter_config.get("emitter_pos", (0.0, 0.0, 0.0))
pc[0:3] = pos
pc[3] = dt
grav = emitter_config.get("gravity", (0.0, -9.8, 0.0))
pc[4:7] = grav
pc[7] = float(emitter_config.get("damping", 0.0))
vel = emitter_config.get("initial_velocity", (0.0, 5.0, 0.0))
pc[8:11] = vel
pc[11] = float(emitter_config.get("velocity_spread", 0.3))
sc = emitter_config.get("start_colour", (1.0, 1.0, 1.0, 1.0))
pc[12:16] = sc
ec = emitter_config.get("end_colour", (1.0, 1.0, 1.0, 0.0))
pc[16:20] = ec
pc[20] = float(emitter_config.get("start_scale", 1.0))
pc[21] = float(emitter_config.get("end_scale", 0.0))
pc[22] = float(emitter_config.get("emission_radius", 1.0))
# max_particles and frame_seed as uint32: reinterpret float bits
uint_view = pc.view(np.uint32)
uint_view[23] = slot.max_particles
uint_view[24] = slot.frame_counter
# pad slots 25-27 are already zero
pc_bytes = pc.tobytes()
ffi = vk.ffi
cbuf = ffi.new("char[]", pc_bytes)
# Memory barrier: ensure previous dispatch's writes are visible
barrier = ffi.new("VkBufferMemoryBarrier*")
barrier.sType = vk.VK_STRUCTURE_TYPE_BUFFER_MEMORY_BARRIER
barrier.srcAccessMask = vk.VK_ACCESS_SHADER_WRITE_BIT
barrier.dstAccessMask = vk.VK_ACCESS_SHADER_READ_BIT | vk.VK_ACCESS_SHADER_WRITE_BIT
barrier.srcQueueFamilyIndex = vk.VK_QUEUE_FAMILY_IGNORED
barrier.dstQueueFamilyIndex = vk.VK_QUEUE_FAMILY_IGNORED
barrier.buffer = slot.particle_buf
barrier.offset = 0
barrier.size = _VK_WHOLE_SIZE_U64
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
0,
0,
None,
1,
[barrier[0]],
0,
None,
)
# Bind compute pipeline + this emitter's descriptor set
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._compute_pipeline)
vk.vkCmdBindDescriptorSets(
cmd,
vk.VK_PIPELINE_BIND_POINT_COMPUTE,
self._compute_layout,
0,
1,
[slot.compute_desc_set],
0,
None,
)
vk._vulkan.lib.vkCmdPushConstants(
cmd,
self._compute_layout,
vk.VK_SHADER_STAGE_COMPUTE_BIT,
0,
len(pc_bytes),
cbuf,
)
# Dispatch enough workgroups to cover this emitter's particles
group_count = (slot.max_particles + _WORKGROUP_SIZE - 1) // _WORKGROUP_SIZE
vk.vkCmdDispatch(cmd, group_count, 1, 1)
# Barrier: compute writes → vertex shader reads
barrier.srcAccessMask = vk.VK_ACCESS_SHADER_WRITE_BIT
barrier.dstAccessMask = vk.VK_ACCESS_SHADER_READ_BIT
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
vk.VK_PIPELINE_STAGE_VERTEX_SHADER_BIT,
0,
0,
None,
1,
[barrier[0]],
0,
None,
)
# ------------------------------------------------------------------ render
[docs]
def render(
self,
cmd: Any,
particle_pass: Any,
view_proj: np.ndarray,
camera_right: np.ndarray,
camera_up: np.ndarray,
extent: tuple[int, int],
) -> None:
"""Draw every active emitter's particles using the shared billboard pipeline.
Iterates the slots whose ``last_active_frame`` matches the current
frame counter (i.e. were dispatched this frame). The compute-owned
SSBO is bound through a per-slot descriptor set built against
``ParticlePass._ssbo_layout``: no per-frame upload.
"""
if not self._ready or particle_pass is None or not particle_pass._ready:
return
device = self._engine.ctx.device
# Single viewport/scissor + pipeline bind for the whole pass.
vk_vp = vk.VkViewport(
x=0.0, y=0.0,
width=float(extent[0]), height=float(extent[1]),
minDepth=0.0, maxDepth=1.0,
)
vk.vkCmdSetViewport(cmd, 0, 1, [vk_vp])
scissor = vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=extent[0], height=extent[1]),
)
vk.vkCmdSetScissor(cmd, 0, 1, [scissor])
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, particle_pass._pipeline)
# Push constants: mat4 view_proj (64) + vec3 camera_right + pad (16)
# + vec3 camera_up + pad (16) = 96 bytes. Same layout as ParticlePass.
pc = np.zeros(24, dtype=np.float32)
pc[:16] = view_proj.T.ravel()
pc[16:19] = camera_right
pc[20:23] = camera_up
pc_bytes = pc.tobytes()
ffi = vk.ffi
cbuf = ffi.new("char[]", pc_bytes)
vk._vulkan.lib.vkCmdPushConstants(
cmd, particle_pass._pipeline_layout,
vk.VK_SHADER_STAGE_VERTEX_BIT,
0, len(pc_bytes), cbuf,
)
for slot in self._emitters.values():
if slot.last_active_frame != self._frame_index:
continue
if slot.graphics_desc_set is None:
slot.graphics_pool, slot.graphics_desc_set = _allocate_graphics_ssbo_set(
device,
particle_pass._ssbo_layout,
slot.particle_buf,
slot.max_particles * _PARTICLE_GPU_STRIDE,
)
vk.vkCmdBindDescriptorSets(
cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, particle_pass._pipeline_layout,
0, 1, [slot.graphics_desc_set], 0, None,
)
vk.vkCmdDraw(cmd, slot.max_particles * 6, 1, 0, 0)
# ------------------------------------------------------------------ misc
[docs]
def get_particle_ssbo(self, emitter_id: int) -> Any:
"""Return the SSBO for ``emitter_id`` (or ``None`` if no slot exists)."""
slot = self._emitters.get(emitter_id)
return slot.particle_buf if slot else None
[docs]
def upload_initial_particles(self, emitter_id: int, particles: np.ndarray) -> None:
"""Seed a specific emitter's GPU buffer with CPU-generated particle data.
Used by tests and tools that want a deterministic starting state.
``len(particles)`` must not exceed the slot's ``max_particles``.
"""
slot = self._emitters.get(emitter_id)
if slot is None or not self._ready:
return
count = min(len(particles), slot.max_particles)
if count == 0:
return
upload_numpy(self._engine.ctx.device, slot.particle_mem, particles[:count])
log.debug("Uploaded %d initial particles to GPU (emitter %d)", count, emitter_id)
[docs]
def emitter_count(self) -> int:
"""Number of active per-emitter slots (for tests / debugging)."""
return len(self._emitters)
[docs]
def has_emitter(self, emitter_id: int) -> bool:
"""True if the given emitter has an allocated slot."""
return emitter_id in self._emitters
[docs]
@property
def ready(self) -> bool:
return self._ready
# ------------------------------------------------------------------ cleanup
def _destroy_slot(self, slot: _EmitterSlot) -> None:
device = self._engine.ctx.device
if slot.graphics_pool is not None:
vk.vkDestroyDescriptorPool(device, slot.graphics_pool, None)
if slot.compute_pool is not None:
vk.vkDestroyDescriptorPool(device, slot.compute_pool, None)
if slot.particle_buf is not None:
vk.vkDestroyBuffer(device, slot.particle_buf, None)
if slot.particle_mem is not None:
vk.vkFreeMemory(device, slot.particle_mem, None)
[docs]
def cleanup(self) -> None:
"""Destroy every per-emitter slot and the shared compute pipeline."""
if not self._ready:
return
device = self._engine.ctx.device
for slot in self._emitters.values():
self._destroy_slot(slot)
self._emitters.clear()
for obj, fn in [
(self._compute_pipeline, vk.vkDestroyPipeline),
(self._compute_layout, vk.vkDestroyPipelineLayout),
(self._compute_module, vk.vkDestroyShaderModule),
(self._desc_layout, vk.vkDestroyDescriptorSetLayout),
]:
if obj:
fn(device, obj, None)
self._ready = False
log.debug("Particle compute resources cleaned up")
def _allocate_graphics_ssbo_set(device: Any, layout: Any, buf: Any, size: int) -> tuple[Any, Any]:
"""Allocate a descriptor set pointing at ``buf`` with the given layout.
Used by :meth:`ParticleCompute.render` to bind the compute-owned particle
buffer into the graphics particle pipeline without touching ParticlePass's
own descriptor set (which references the CPU-uploaded buffer).
Returns ``(pool, set)`` so the caller owns the lifetime of both.
"""
pool = create_pool_for_types(device, {vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER: 1})
desc_set = allocate_descriptor_set(device, pool, layout)
with DescriptorWriteBatch(device) as batch:
batch.ssbo(desc_set, 0, buf, size)
return pool, desc_set