Source code for simvx.graphics.renderer.velocity_pass

"""Per-object motion-vector (velocity) pass for TAA (desktop Vulkan).

Foundation stage for per-object TAA: produces a per-pixel velocity buffer for
opaque MESH INSTANCES so a moving mesh stops ghosting once the resolve samples
it (the resolve hookup is a separate stage). Purely additive and gated on
``taa_enabled``: nothing here is allocated, uploaded, or recorded when TAA is
off, so the no-TAA frame stays byte-identical.

How it works
------------
A dedicated RG16F render target (with its own depth) is re-drawn from the same
opaque instances the forward pass submitted. ``velocity.vert`` reads two parallel
model-matrix SSBOs keyed by ``gl_InstanceIndex`` (current + previous frame) and,
together with the UNJITTERED current/previous view-projection (from
``PostProcessPass.taa_cur_vp`` / ``taa_prev_vp``), emits NDC-space motion vectors.

Previous-frame transforms are kept on the CPU side: each frame the renderer hands
this pass the column-major model matrices it already computed for the main
transform SSBO. We upload them as "current", and roll last frame's into "previous".
On the first frame -- or whenever the scene tree structure changes (instance row
indices may shift) -- prev is forced equal to current, yielding zero velocity (no
spike), mirroring the camera-VP first-frame guard in ``update_taa_matrices``.

Background / sky pixels keep the render pass's clear sentinel ``(-2, -2)`` (outside
any legitimate NDC-delta range) so the resolve can fall back to depth-based camera
reprojection there.

SCOPE: opaque mesh instances only. Skinned meshes and GPU particles have no
prev-joint / prev-particle state at this boundary and are intentionally NOT drawn
here -- they keep the resolve's camera-only depth-reproject behaviour (mild
self-motion ghosting, no regression). Per-object velocity for those is a flagged
follow-up.
"""

import logging
from typing import Any

import numpy as np
import vulkan as vk

from ..gpu.descriptors import (
    DescriptorWriteBatch,
    allocate_descriptor_set,
    create_descriptor_set_layout,
)
from ..gpu.memory import create_buffer, create_sampler, upload_numpy
from ..gpu.pipeline import (
    FORWARD_VERTEX_ATTRS,
    FORWARD_VERTEX_STRIDE,
    PipelineSpec,
    build_pipeline,
    create_shader_module,
)
from ..materials.shader_compiler import compile_shader
from .render_target import RenderTarget

__all__ = ["VelocityPass"]

log = logging.getLogger(__name__)

# RG16F: signed NDC-space motion vector (current NDC - previous NDC).
_VELOCITY_FORMAT = vk.VK_FORMAT_R16G16_SFLOAT

# Clear sentinel: outside any legitimate NDC-delta range ([-2, 2]). The resolve
# treats ``vel.x < -1.5`` as "no per-object velocity here -> use depth fallback".
_CLEAR_SENTINEL = (-2.0, -2.0)

# Per-instance model matrix: mat4 = 64 bytes (column-major, GLSL-ready).
_MODEL_STRIDE = 64

# UBO: cur_vp(mat4=64) + prev_vp(mat4=64) = 128 bytes.
_UBO_SIZE = 128

_HOST_FLAGS = vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT
_SSBO_USAGE = vk.VK_BUFFER_USAGE_STORAGE_BUFFER_BIT


[docs] class VelocityPass: """Re-draws opaque mesh instances into an RG16F per-object velocity target.""" def __init__(self, engine: Any, max_objects: int) -> None: self._engine = engine self._max_objects = max_objects self._ready = False self._width = 0 self._height = 0 self._target: RenderTarget | None = None # Two parallel model-matrix SSBOs (column-major mat4 per instance). self._cur_buf: Any = None self._cur_mem: Any = None self._prev_buf: Any = None self._prev_mem: Any = None # Last frame's column-major models, cached CPU-side so we can upload them # as "previous" next frame without a GPU copy. None until the first upload. self._last_models: np.ndarray | None = None self._ubo_buf: Any = None self._ubo_mem: Any = None self._sampler: Any = None self._desc_layout: Any = None self._desc_pool: Any = None self._desc_set: Any = None self._pipeline: Any = None self._pipeline_double: Any = None # double-sided (cull off) variant self._pipeline_layout: Any = None self._vert_module: Any = None self._frag_module: Any = None # Per-frame matrices (set by the renderer before render()). self._cur_vp = np.eye(4, dtype=np.float32) self._prev_vp = np.eye(4, dtype=np.float32) # How many instance rows are live this frame (drives draw range bounds). self._instance_count = 0 # -- views ----------------------------------------------------------------
[docs] @property def velocity_view(self) -> Any: """RG16F colour view the resolve samples (SHADER_READ_ONLY after the pass).""" return self._target.colour_view if self._target else None
[docs] @property def ready(self) -> bool: return self._ready
# -- setup ----------------------------------------------------------------
[docs] def setup(self, width: int, height: int) -> None: """Allocate the velocity target, prev/cur model SSBOs, UBO, descriptors, pipelines. Called lazily by the renderer only when TAA is first enabled, so the no-TAA path never touches any of this. """ e = self._engine device = e.ctx.device phys = e.ctx.physical_device self._width, self._height = width, height # RG16F colour + its own depth (opaque depth-test: nearest surface wins). self._target = RenderTarget( device, phys, width, height, colour_format=_VELOCITY_FORMAT, use_depth=True, queue=e.ctx.graphics_queue, command_pool=e.ctx.command_pool, ) model_size = self._max_objects * _MODEL_STRIDE self._cur_buf, self._cur_mem = create_buffer(device, phys, model_size, _SSBO_USAGE, _HOST_FLAGS) self._prev_buf, self._prev_mem = create_buffer(device, phys, model_size, _SSBO_USAGE, _HOST_FLAGS) self._ubo_buf, self._ubo_mem = create_buffer( device, phys, _UBO_SIZE, vk.VK_BUFFER_USAGE_UNIFORM_BUFFER_BIT, _HOST_FLAGS, ) vs = vk.VK_SHADER_STAGE_VERTEX_BIT self._desc_layout = create_descriptor_set_layout(device, [ (0, vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, vs, 1), # current models (1, vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, vs, 1), # previous models (2, vk.VK_DESCRIPTOR_TYPE_UNIFORM_BUFFER, vs, 1), # cur_vp + prev_vp ]) pool_sizes = [ vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, descriptorCount=2), vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_UNIFORM_BUFFER, descriptorCount=1), ] self._desc_pool = vk.vkCreateDescriptorPool(device, vk.VkDescriptorPoolCreateInfo( maxSets=1, poolSizeCount=len(pool_sizes), pPoolSizes=pool_sizes, ), None) self._desc_set = allocate_descriptor_set(device, self._desc_pool, self._desc_layout) with DescriptorWriteBatch(device) as batch: batch.ssbo(self._desc_set, 0, self._cur_buf, model_size) batch.ssbo(self._desc_set, 1, self._prev_buf, model_size) batch.uniform_buffer(self._desc_set, 2, self._ubo_buf, _UBO_SIZE) self._sampler = create_sampler(device) shader_dir = e.shader_dir self._vert_module = create_shader_module(device, compile_shader(shader_dir / "velocity.vert")) self._frag_module = create_shader_module(device, compile_shader(shader_dir / "velocity.frag")) self._pipeline = self._create_pipeline((width, height), double_sided=False) self._pipeline_double = self._create_pipeline((width, height), double_sided=True) self._last_models = None self._ready = True log.debug("Velocity pass initialised (%dx%d)", width, height)
# -- per-frame data -------------------------------------------------------
[docs] def set_frame_matrices(self, cur_vp: np.ndarray, prev_vp: np.ndarray) -> None: """Stash the UNJITTERED current + previous view-projection for this frame.""" self._cur_vp = np.asarray(cur_vp, dtype=np.float32) self._prev_vp = np.asarray(prev_vp, dtype=np.float32)
[docs] def upload_models(self, models_col_major: np.ndarray, structure_changed: bool) -> None: """Upload current model matrices and roll the previous-frame copy. ``models_col_major`` is the (N, 4, 4) column-major (GLSL-ready) model matrices the renderer already computed for the main transform SSBO -- the same row order, so ``gl_InstanceIndex`` pairs current<->previous correctly. On the first frame (no cached prior models) or after a tree-structure change (row indices may have shifted -> a stale prev would spike), prev is set equal to current so velocity is zero that frame. """ if not self._ready: return cur = np.ascontiguousarray(models_col_major, dtype=np.float32) n = cur.shape[0] self._instance_count = n if n == 0: self._last_models = cur return prev = cur if (self._last_models is None or structure_changed or self._last_models.shape[0] != n) else self._last_models device = self._engine.ctx.device upload_numpy(device, self._cur_mem, cur) upload_numpy(device, self._prev_mem, np.ascontiguousarray(prev, dtype=np.float32)) self._last_models = cur
def _upload_uniforms(self) -> None: # row-major numpy -> column-major GLSL (transpose). data = np.empty(_UBO_SIZE // 4, dtype=np.float32) data[0:16] = self._cur_vp.T.ravel() data[16:32] = self._prev_vp.T.ravel() upload_numpy(self._engine.ctx.device, self._ubo_mem, data) # -- record ---------------------------------------------------------------
[docs] def render(self, cmd: Any, scene_renderer: Any) -> None: """Re-draw opaque instances into the velocity target. No-op when not ready. Reuses the scene content renderer's opaque draw path with this pass's pipeline/layout/descriptor overrides, so culling + grouping + the indirect batch are shared with the forward pass (no second Python draw loop). """ if not self._ready or self._instance_count == 0: return self._upload_uniforms() rt = self._target rp_begin = vk.VkRenderPassBeginInfo( renderPass=rt.render_pass, framebuffer=rt.framebuffer, renderArea=vk.VkRect2D(offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=rt.width, height=rt.height)), clearValueCount=2, pClearValues=[ vk.VkClearValue(color=vk.VkClearColorValue( float32=[_CLEAR_SENTINEL[0], _CLEAR_SENTINEL[1], 0.0, 0.0])), vk.VkClearValue(depthStencil=vk.VkClearDepthStencilValue(depth=1.0, stencil=0)), ], ) vk.vkCmdBeginRenderPass(cmd, rp_begin, vk.VK_SUBPASS_CONTENTS_INLINE) scene_renderer.render_velocity(cmd, self) vk.vkCmdEndRenderPass(cmd)
# -- pipeline ------------------------------------------------------------- def _create_pipeline(self, extent: tuple[int, int], *, double_sided: bool) -> Any: """Build a velocity pipeline (own push-free layout, set 0 = our desc set). Declares its fixed-function state via :class:`PipelineSpec` and defers all cffi plumbing (and lifetime) to :func:`build_pipeline`. The shaders are compiled at runtime, so the pre-created modules are passed directly. Push constants are unused (matrices come via the UBO), so the layout has a single descriptor set and no push range -- independent of the forward pipeline's full push budget. Both variants share one layout: the first call records it, the second reuses it and the duplicate is dropped. """ spec = PipelineSpec( name=f"Velocity (double_sided={double_sided})", topology=vk.VK_PRIMITIVE_TOPOLOGY_TRIANGLE_LIST, # Same pos/normal/uv vertex layout as the forward pipeline: # we reuse the existing mesh vertex buffers verbatim. vertex_stride=FORWARD_VERTEX_STRIDE, vertex_attrs=FORWARD_VERTEX_ATTRS, cull_mode=vk.VK_CULL_MODE_NONE if double_sided else vk.VK_CULL_MODE_BACK_BIT, depth_test=True, depth_write=True, depth_compare=vk.VK_COMPARE_OP_LESS, # opaque: nearest surface wins blend="opaque", # single RG16F attachment set_layouts=(self._desc_layout,), ) device = self._engine.ctx.device pipeline, layout = build_pipeline( device, spec, self._target.render_pass, extent, vert_module=self._vert_module, frag_module=self._frag_module, ) if self._pipeline_layout is None: self._pipeline_layout = layout else: # Both variants need byte-identical layouts; keep the first and free # this duplicate so cleanup destroys exactly one layout. vk.vkDestroyPipelineLayout(device, layout, None) return pipeline # accessors for the scene renderer's override path
[docs] @property def pipeline(self) -> Any: return self._pipeline
[docs] @property def pipeline_double(self) -> Any: return self._pipeline_double
[docs] @property def pipeline_layout(self) -> Any: return self._pipeline_layout
[docs] @property def desc_set(self) -> Any: return self._desc_set
# -- resize / cleanup -----------------------------------------------------
[docs] def resize(self, width: int, height: int) -> None: if not self._ready: return self._width, self._height = width, height if self._target: self._target.destroy() e = self._engine self._target = RenderTarget( e.ctx.device, e.ctx.physical_device, width, height, colour_format=_VELOCITY_FORMAT, use_depth=True, queue=e.ctx.graphics_queue, command_pool=e.ctx.command_pool, ) # Render pass handle changed: rebuild pipelines against the new pass. device = e.ctx.device if self._pipeline: vk.vkDestroyPipeline(device, self._pipeline, None) if self._pipeline_double: vk.vkDestroyPipeline(device, self._pipeline_double, None) self._pipeline = self._create_pipeline((width, height), double_sided=False) self._pipeline_double = self._create_pipeline((width, height), double_sided=True) # History of prev models is resolution-independent, but a resize is a good # moment to drop it so we don't pair stale rows after a scene swap. self._last_models = None
[docs] def cleanup(self) -> None: if not self._ready: return device = self._engine.ctx.device for pipe in (self._pipeline, self._pipeline_double): if pipe: vk.vkDestroyPipeline(device, pipe, None) if self._pipeline_layout: vk.vkDestroyPipelineLayout(device, self._pipeline_layout, None) for mod in (self._vert_module, self._frag_module): if mod: vk.vkDestroyShaderModule(device, mod, None) if self._desc_pool: vk.vkDestroyDescriptorPool(device, self._desc_pool, None) if self._desc_layout: vk.vkDestroyDescriptorSetLayout(device, self._desc_layout, None) if self._sampler: vk.vkDestroySampler(device, self._sampler, None) for buf, mem in ( (self._cur_buf, self._cur_mem), (self._prev_buf, self._prev_mem), (self._ubo_buf, self._ubo_mem), ): if buf: vk.vkDestroyBuffer(device, buf, None) if mem: vk.vkFreeMemory(device, mem, None) if self._target: self._target.destroy() self._target = None self._pipeline = self._pipeline_double = self._pipeline_layout = None self._ready = False