Source code for simvx.graphics.renderer.occlusion_cull

"""GPU Hi-Z occlusion cull pass (phase O3).

Patches per-instance ``instance_count`` in an indirect draw buffer IN PLACE:
0 to cull, 1 to keep. Batching guarantees one command per instance, so zeroing a
command drops exactly that one object. CONSERVATIVE: an object is culled only
when its nearest screen depth is strictly farther than the Hi-Z (MAX) occluder
footprint; every degeneracy keeps the object. See ``occlusion_cull.comp``.

Single-phase against LAST frame's Hi-Z pyramid: the cull runs BEFORE the forward
pass (compute is illegal inside a render pass), consuming the pyramid the prior
frame built. Fully gated by ``Renderer._occlusion_culling_enabled`` and by a
``hiz_built_once`` flag (skip until a pyramid exists), so the default path
allocates and dispatches nothing.
"""

import logging
from typing import Any

import numpy as np
import vulkan as vk

from ..types import AABB_DTYPE, INDIRECT_DRAW_DTYPE, TRANSFORM_DTYPE
from ..gpu.descriptors import (
    DescriptorWriteBatch,
    allocate_descriptor_set,
    create_descriptor_set_layout,
    create_pool_for_types,
)
from ..gpu.pipeline_compute import create_compute_pipeline

__all__ = ["OcclusionTwoPhasePass"]

log = logging.getLogger(__name__)

_LOCAL_SIZE = 64

# Two-phase push constant: mat4 view_proj(64) + ivec2 base_extent(8) + int mip(4)
# + int draw_count(4) + int phase(4) + int skip_cull(4) = 88.
_PC2_SIZE = 88


[docs] class OcclusionTwoPhasePass: """Two-phase Hi-Z occlusion cull: GPU phase-1 selection + phase-2 cull. Reuses ``occlusion_two_phase.comp`` (one shader, dispatched twice). Owns a descriptor set per ``(indirect buffer, frame parity)``: bindings 4/5 (vis_prev, vis_next) swap each frame, so a fresh set is allocated for each parity and then cached. Bindings 0-3 (draws, transforms, aabbs, hi-z) match the single-phase layout. See the shader header for the conservative invariant. """ def __init__(self, engine: Any, max_sets: int = 16) -> None: self._engine = engine self._ready = False self._max_sets = max_sets self._pipeline: Any = None self._layout: Any = None self._module: Any = None self._desc_layout: Any = None self._desc_pool: Any = None # Keyed by (indirect buffer ptr, parity) -> descriptor set. self._sets: dict[tuple[int, int], Any] = {}
[docs] def setup(self) -> None: device = self._engine.ctx.device cs = vk.VK_SHADER_STAGE_COMPUTE_BIT self._desc_layout = create_descriptor_set_layout( device, [ (0, vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, cs, 1), # indirect draws (rw) (1, vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, cs, 1), # transforms (2, vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, cs, 1), # aabbs (3, vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, cs, 1), # hi-z (4, vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, cs, 1), # vis_prev (read) (5, vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, cs, 1), # vis_next (write) ], ) self._desc_pool = create_pool_for_types( device, { vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER: 5 * self._max_sets, vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER: self._max_sets, }, max_sets=self._max_sets, ) self._pipeline, self._layout, self._module = create_compute_pipeline( device, self._engine.shader_dir / "occlusion_two_phase.comp", [self._desc_layout], _PC2_SIZE, ) self._ready = True
def _set_for( self, indirect_buffer: Any, transform_buf: Any, aabb_buf: Any, hiz_view: Any, hiz_sampler: Any, vis_prev_buf: Any, vis_next_buf: Any, parity: int, indirect_size: int, transform_size: int, aabb_size: int, vis_size: int, ) -> Any: key = (int(vk.ffi.cast("uintptr_t", indirect_buffer)), parity) ds = self._sets.get(key) if ds is not None: return ds device = self._engine.ctx.device ds = allocate_descriptor_set(device, self._desc_pool, self._desc_layout) with DescriptorWriteBatch(device) as b: b.ssbo(ds, 0, indirect_buffer, indirect_size) b.ssbo(ds, 1, transform_buf, transform_size) b.ssbo(ds, 2, aabb_buf, aabb_size) b.image(ds, 3, hiz_view, hiz_sampler, image_layout=vk.VK_IMAGE_LAYOUT_GENERAL) b.ssbo(ds, 4, vis_prev_buf, vis_size) b.ssbo(ds, 5, vis_next_buf, vis_size) self._sets[key] = ds return ds
[docs] def dispatch( self, cmd: Any, *, phase: int, skip_cull: bool, indirect_buffer: Any, draw_count: int, transform_buf: Any, aabb_buf: Any, hiz_view: Any, hiz_sampler: Any, vis_prev_buf: Any, vis_next_buf: Any, parity: int, view_proj: np.ndarray, base_extent: tuple[int, int], mip_count: int, max_objects: int, host_barrier: bool = False, ) -> None: """Record one phase of the two-phase cull. ``phase`` 1 = selection (seed phase-1 batch + vis_next from vis_prev), 2 = cull (test set B against the fresh Hi-Z, write final instance_count + vis_next). The caller guarantees this runs OUTSIDE any render pass. Barriers: ``host_barrier`` (phase-1 only) makes the CPU indirect/transform/ aabb/visibility uploads visible. After the dispatch a buffer barrier makes the patched ``instance_count`` visible to the indirect draw that reads it (phase 1 -> depth prepass, phase 2 -> colour pass). """ if not self._ready or draw_count == 0: return transform_size = max_objects * TRANSFORM_DTYPE.itemsize aabb_size = max_objects * AABB_DTYPE.itemsize vis_size = max_objects * 4 # uint32 per slot ds = self._set_for( indirect_buffer, transform_buf, aabb_buf, hiz_view, hiz_sampler, vis_prev_buf, vis_next_buf, parity, max_objects * INDIRECT_DRAW_DTYPE.itemsize, transform_size, aabb_size, vis_size, ) if host_barrier: hb = vk.VkMemoryBarrier( srcAccessMask=vk.VK_ACCESS_HOST_WRITE_BIT, dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT | vk.VK_ACCESS_SHADER_WRITE_BIT, ) vk.vkCmdPipelineBarrier( cmd, vk.VK_PIPELINE_STAGE_HOST_BIT, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT, 0, 1, [hb], 0, None, 0, None, ) vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._pipeline) vk.vkCmdBindDescriptorSets( cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._layout, 0, 1, [ds], 0, None, ) vp_T = np.ascontiguousarray(view_proj.T, dtype=np.float32) tail = np.array( [base_extent[0], base_extent[1], mip_count, draw_count, phase, 1 if skip_cull else 0], dtype=np.int32, ) pc_bytes = vp_T.tobytes() + tail.tobytes() cbuf = vk.ffi.new("char[]", pc_bytes) vk._vulkan.lib.vkCmdPushConstants( cmd, self._layout, vk.VK_SHADER_STAGE_COMPUTE_BIT, 0, _PC2_SIZE, cbuf, ) groups = (draw_count + _LOCAL_SIZE - 1) // _LOCAL_SIZE vk.vkCmdDispatch(cmd, groups, 1, 1) # Patched instance_count must be visible to the indirect draw that reads it. draw_barrier = vk.VkBufferMemoryBarrier( srcAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT, dstAccessMask=vk.VK_ACCESS_INDIRECT_COMMAND_READ_BIT, srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, buffer=indirect_buffer, offset=0, size=draw_count * INDIRECT_DRAW_DTYPE.itemsize, ) vk.vkCmdPipelineBarrier( cmd, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT, vk.VK_PIPELINE_STAGE_DRAW_INDIRECT_BIT, 0, 0, None, 1, [draw_barrier], 0, None, )
[docs] def cleanup(self) -> None: if not self._ready: return device = self._engine.ctx.device if self._pipeline: vk.vkDestroyPipeline(device, self._pipeline, None) if self._layout: vk.vkDestroyPipelineLayout(device, self._layout, None) if self._module: vk.vkDestroyShaderModule(device, self._module, None) if self._desc_pool: vk.vkDestroyDescriptorPool(device, self._desc_pool, None) self._desc_pool = None if self._desc_layout: vk.vkDestroyDescriptorSetLayout(device, self._desc_layout, None) self._desc_layout = None self._sets.clear() self._ready = False