"""Hi-Z depth pyramid build pass (occlusion-culling phase O2).
Builds a hierarchical depth pyramid from the HDR target's D32 depth buffer:
mip0 is a 1:1 copy of the scene depth, and each successive mip is a
conservative MAX reduction (farthest occluder, standard 0..1 depth) over the
previous mip's 2x2 footprint. The pyramid is consumed later (O3) by a compute
occlusion cull; this phase only builds and validates it.
The pass is fully gated by ``Renderer._occlusion_culling_enabled``: it is
lazily created on the first frame the toggle is on (mirroring VelocityPass),
so with occlusion off nothing here is allocated or dispatched.
Resource/descriptor approach mirrors :class:`SSAOPass`: a compute pass that
samples the samplable D32 depth target in DEPTH_STENCIL_READ_ONLY_OPTIMAL.
"""
import logging
import math
from typing import Any
import numpy as np
import vulkan as vk
from ..gpu.descriptors import (
DescriptorWriteBatch,
allocate_descriptor_set,
create_descriptor_set_layout,
create_pool_for_types,
)
from ..gpu.pipeline_compute import create_compute_pipeline
__all__ = ["HiZPass", "hiz_mip_count", "hiz_mip_sizes"]
log = logging.getLogger(__name__)
# Push constant: ivec2 dst_size(8) + ivec2 src_size(8) + int mip0(4) = 20 bytes.
_PC_SIZE = 20
[docs]
def hiz_mip_count(width: int, height: int) -> int:
"""Number of mip levels in the Hi-Z pyramid for the given base extent.
``floor(log2(max(w, h))) + 1`` so the top mip is 1x1.
"""
return int(math.floor(math.log2(max(1, max(width, height))))) + 1
[docs]
def hiz_mip_sizes(width: int, height: int) -> list[tuple[int, int]]:
"""Per-mip ``(w, h)`` dimensions, each ``(max(1, w>>i), max(1, h>>i))``."""
return [(max(1, width >> i), max(1, height >> i)) for i in range(hiz_mip_count(width, height))]
[docs]
class HiZPass:
"""Builds a Hi-Z (hierarchical max-depth) pyramid via per-mip compute dispatches."""
def __init__(self, engine: Any) -> None:
self._engine = engine
self._ready = False
# Compute pipeline (one shader, dispatched once per mip).
self._pipeline: Any = None
self._layout: Any = None
self._module: Any = None
# Mip-chain image (R32_SFLOAT, full mip chain).
self._image: Any = None
self._memory: Any = None
self._mip_views: list[Any] = []
# Full-chain sampled view (baseMipLevel=0, levelCount=mip_count) for the
# O3 occlusion cull (textureLod across the pyramid).
self._sampled_view: Any = None
# Descriptors: one set per mip (src sampler + dst storage image).
self._desc_layout: Any = None
self._desc_pool: Any = None
self._desc_sets: list[Any] = []
self._sampler: Any = None
# Depth source.
self._depth_view: Any = None
self._depth_image: Any = None
# Dimensions / mip plan.
self._width = 0
self._height = 0
self._mip_sizes: list[tuple[int, int]] = []
[docs]
@property
def mip_count(self) -> int:
return len(self._mip_views)
[docs]
@property
def image(self) -> Any:
"""The Hi-Z R32F mip-chain image (for O3 readers / test readback)."""
return self._image
[docs]
@property
def sampled_view(self) -> Any:
"""Full-mip-chain sampled view (for the O3 occlusion cull's textureLod)."""
return self._sampled_view
[docs]
@property
def sampler(self) -> Any:
"""The NEAREST/CLAMP sampler shared by the build and the O3 cull."""
return self._sampler
[docs]
@property
def base_extent(self) -> tuple[int, int]:
"""Base (mip0) extent ``(width, height)`` of the pyramid."""
return (self._width, self._height)
[docs]
def setup(self, width: int, height: int, depth_view: Any, depth_image: Any) -> None:
"""Allocate the pyramid image, per-mip views, sampler, descriptors and pipeline."""
self._width = width
self._height = height
self._depth_view = depth_view
self._depth_image = depth_image
self._mip_sizes = hiz_mip_sizes(width, height)
self._create_image()
self._create_sampler()
self._create_descriptors()
self._create_pipeline()
self._ready = True
log.debug("Hi-Z pass initialised (%dx%d, %d mips)", width, height, len(self._mip_sizes))
# ------------------------------------------------------------------ image
def _create_image(self) -> None:
"""Create the R32F mip-chain image (storage + sampled + transfer-src) and per-mip views.
The shared ``gpu.memory.create_image`` hardcodes ``mipLevels=1``, so the
VkImageCreateInfo is inlined here with the full mip count. TRANSFER_SRC is
included so each mip can be copied back for the GPU readback test, and
TRANSFER_DST so the image can be cleared to far=1.0 on (re)allocation
(belt-and-suspenders against a stray O3 sample on the first frame); both
are cheap, harmless usage bits on a storage image.
"""
from ..gpu.memory import _find_memory_type
e = self._engine
device = e.ctx.device
mip_count = len(self._mip_sizes)
img_info = vk.VkImageCreateInfo(
imageType=vk.VK_IMAGE_TYPE_2D,
format=vk.VK_FORMAT_R32_SFLOAT,
extent=vk.VkExtent3D(width=self._width, height=self._height, depth=1),
mipLevels=mip_count,
arrayLayers=1,
samples=vk.VK_SAMPLE_COUNT_1_BIT,
tiling=vk.VK_IMAGE_TILING_OPTIMAL,
usage=vk.VK_IMAGE_USAGE_STORAGE_BIT
| vk.VK_IMAGE_USAGE_SAMPLED_BIT
| vk.VK_IMAGE_USAGE_TRANSFER_SRC_BIT
| vk.VK_IMAGE_USAGE_TRANSFER_DST_BIT,
sharingMode=vk.VK_SHARING_MODE_EXCLUSIVE,
initialLayout=vk.VK_IMAGE_LAYOUT_UNDEFINED,
)
self._image = vk.vkCreateImage(device, img_info, None)
mem_reqs = vk.vkGetImageMemoryRequirements(device, self._image)
alloc_info = vk.VkMemoryAllocateInfo(
allocationSize=mem_reqs.size,
memoryTypeIndex=_find_memory_type(
e.ctx.physical_device, mem_reqs.memoryTypeBits, vk.VK_MEMORY_PROPERTY_DEVICE_LOCAL_BIT,
),
)
self._memory = vk.vkAllocateMemory(device, alloc_info, None)
vk.vkBindImageMemory(device, self._image, self._memory, 0)
# One single-level view per mip: each is both the storage write target for
# its own dispatch and the sampled source for the next mip's reduction.
self._mip_views = []
for i in range(mip_count):
view = vk.vkCreateImageView(
device,
vk.VkImageViewCreateInfo(
image=self._image,
viewType=vk.VK_IMAGE_VIEW_TYPE_2D,
format=vk.VK_FORMAT_R32_SFLOAT,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=i,
levelCount=1,
baseArrayLayer=0,
layerCount=1,
),
),
None,
)
self._mip_views.append(view)
# Full-chain sampled view for the O3 occlusion cull (textureLod across all mips).
self._sampled_view = vk.vkCreateImageView(
device,
vk.VkImageViewCreateInfo(
image=self._image,
viewType=vk.VK_IMAGE_VIEW_TYPE_2D,
format=vk.VK_FORMAT_R32_SFLOAT,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0,
levelCount=mip_count,
baseArrayLayer=0,
layerCount=1,
),
),
None,
)
# Transition the whole image UNDEFINED -> GENERAL once (all mips). Storage
# writes require GENERAL; sampling a GENERAL image in compute is legal, so
# the image stays in GENERAL across the entire build.
from ..gpu.memory import begin_single_time_commands, end_single_time_commands
cmd = begin_single_time_commands(device, e.ctx.command_pool)
barrier = vk.VkImageMemoryBarrier(
srcAccessMask=0,
dstAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT,
oldLayout=vk.VK_IMAGE_LAYOUT_UNDEFINED,
newLayout=vk.VK_IMAGE_LAYOUT_GENERAL,
srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
image=self._image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0,
levelCount=mip_count,
baseArrayLayer=0,
layerCount=1,
),
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_TOP_OF_PIPE_BIT,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
0, 0, None, 0, None, 1, [barrier],
)
# Initialise every mip to far=1.0 so a stray O3 sample before the first
# build reads "nothing occludes -> keep" (no false culling on frame 0 /
# the frame after a resize). Belt-and-suspenders alongside hiz_built_once.
clear = vk.VkClearColorValue(float32=[1.0, 1.0, 1.0, 1.0])
clear_range = vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0, levelCount=mip_count, baseArrayLayer=0, layerCount=1,
)
vk.vkCmdClearColorImage(
cmd, self._image, vk.VK_IMAGE_LAYOUT_GENERAL, clear, 1, [clear_range],
)
end_single_time_commands(device, e.ctx.graphics_queue, e.ctx.command_pool, cmd)
def _create_sampler(self) -> None:
"""NEAREST, CLAMP_TO_EDGE sampler for the depth target and Hi-Z mips."""
self._sampler = vk.vkCreateSampler(
self._engine.ctx.device,
vk.VkSamplerCreateInfo(
magFilter=vk.VK_FILTER_NEAREST,
minFilter=vk.VK_FILTER_NEAREST,
addressModeU=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
addressModeV=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
addressModeW=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
anisotropyEnable=vk.VK_FALSE,
unnormalizedCoordinates=vk.VK_FALSE,
mipmapMode=vk.VK_SAMPLER_MIPMAP_MODE_NEAREST,
),
None,
)
def _create_descriptors(self) -> None:
"""One descriptor set per mip: binding0 = src sampler2D, binding1 = dst storage image."""
device = self._engine.ctx.device
cs = vk.VK_SHADER_STAGE_COMPUTE_BIT
mip_count = len(self._mip_sizes)
self._desc_layout = create_descriptor_set_layout(device, [
(0, vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, cs, 1),
(1, vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE, cs, 1),
])
self._desc_pool = create_pool_for_types(
device,
{
vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER: mip_count,
vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE: mip_count,
},
max_sets=mip_count,
)
self._desc_sets = [
allocate_descriptor_set(device, self._desc_pool, self._desc_layout) for _ in range(mip_count)
]
self._write_descriptors()
def _write_descriptors(self) -> None:
"""Bind, per mip i: src = depth target (mip0) or Hi-Z mip(i-1); dst = Hi-Z mip i."""
with DescriptorWriteBatch(self._engine.ctx.device) as batch:
for i, ds in enumerate(self._desc_sets):
if i == 0:
# mip0 samples the depth target in DEPTH_STENCIL_READ_ONLY_OPTIMAL.
batch.image(
ds, 0, self._depth_view, self._sampler,
image_layout=vk.VK_IMAGE_LAYOUT_DEPTH_STENCIL_READ_ONLY_OPTIMAL,
)
else:
# Reduction mips sample the previous Hi-Z mip (GENERAL).
batch.image(
ds, 0, self._mip_views[i - 1], self._sampler,
image_layout=vk.VK_IMAGE_LAYOUT_GENERAL,
)
batch.storage_image(ds, 1, self._mip_views[i])
def _create_pipeline(self) -> None:
e = self._engine
self._pipeline, self._layout, self._module = create_compute_pipeline(
e.ctx.device, e.shader_dir / "hiz_build.comp", [self._desc_layout], _PC_SIZE,
)
# ----------------------------------------------------------------- render
[docs]
def render(self, cmd: Any) -> None:
"""Build the full pyramid: dispatch the reduction once per mip.
Call after the HDR pass has produced depth (depth in
DEPTH_STENCIL_READ_ONLY_OPTIMAL). A pure depth reduction: no camera matrices.
"""
if not self._ready:
return
ffi = vk.ffi
# Ensure depth writes are visible to the mip0 compute read (mirror SSAO).
if self._depth_image:
depth_barrier = vk.VkImageMemoryBarrier(
srcAccessMask=vk.VK_ACCESS_DEPTH_STENCIL_ATTACHMENT_WRITE_BIT,
dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT,
oldLayout=vk.VK_IMAGE_LAYOUT_DEPTH_STENCIL_READ_ONLY_OPTIMAL,
newLayout=vk.VK_IMAGE_LAYOUT_DEPTH_STENCIL_READ_ONLY_OPTIMAL,
srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
image=self._depth_image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_DEPTH_BIT,
baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1,
),
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_LATE_FRAGMENT_TESTS_BIT,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
0, 0, None, 0, None, 1, [depth_barrier],
)
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._pipeline)
for i, (dw, dh) in enumerate(self._mip_sizes):
vk.vkCmdBindDescriptorSets(
cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._layout, 0, 1, [self._desc_sets[i]], 0, None,
)
sw, sh = (dw, dh) if i == 0 else self._mip_sizes[i - 1]
pc = np.array([dw, dh, sw, sh, 1 if i == 0 else 0], dtype=np.int32)
cbuf = ffi.new("char[]", pc.tobytes())
vk._vulkan.lib.vkCmdPushConstants(
cmd, self._layout, vk.VK_SHADER_STAGE_COMPUTE_BIT, 0, _PC_SIZE, cbuf,
)
vk.vkCmdDispatch(cmd, (dw + 7) // 8, (dh + 7) // 8, 1)
# Inter-mip barrier: this mip's write must be visible to the next
# mip's sample (COMPUTE write -> COMPUTE read on the same GENERAL layout).
if i + 1 < len(self._mip_sizes):
barrier = vk.VkImageMemoryBarrier(
srcAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT,
dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT,
oldLayout=vk.VK_IMAGE_LAYOUT_GENERAL,
newLayout=vk.VK_IMAGE_LAYOUT_GENERAL,
srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
image=self._image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=i, levelCount=1, baseArrayLayer=0, layerCount=1,
),
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
0, 0, None, 0, None, 1, [barrier],
)
# ----------------------------------------------------------------- resize
[docs]
def resize(self, width: int, height: int, depth_view: Any, depth_image: Any) -> None:
"""Recreate the pyramid for a new extent and re-write descriptors."""
if not self._ready:
return
self._width = width
self._height = height
self._depth_view = depth_view
self._depth_image = depth_image
self._mip_sizes = hiz_mip_sizes(width, height)
self._destroy_image()
self._destroy_descriptors()
self._create_image()
self._create_descriptors()
# ---------------------------------------------------------------- cleanup
def _destroy_image(self) -> None:
device = self._engine.ctx.device
for view in self._mip_views:
vk.vkDestroyImageView(device, view, None)
self._mip_views = []
if self._sampled_view:
vk.vkDestroyImageView(device, self._sampled_view, None)
self._sampled_view = None
if self._image:
vk.vkDestroyImage(device, self._image, None)
self._image = None
if self._memory:
vk.vkFreeMemory(device, self._memory, None)
self._memory = None
def _destroy_descriptors(self) -> None:
device = self._engine.ctx.device
if self._desc_pool:
vk.vkDestroyDescriptorPool(device, self._desc_pool, None)
self._desc_pool = None
self._desc_sets = []
[docs]
def cleanup(self) -> None:
if not self._ready:
return
device = self._engine.ctx.device
if self._pipeline:
vk.vkDestroyPipeline(device, self._pipeline, None)
if self._layout:
vk.vkDestroyPipelineLayout(device, self._layout, None)
if self._module:
vk.vkDestroyShaderModule(device, self._module, None)
self._destroy_descriptors()
if self._desc_layout:
vk.vkDestroyDescriptorSetLayout(device, self._desc_layout, None)
self._desc_layout = None
if self._sampler:
vk.vkDestroySampler(device, self._sampler, None)
self._sampler = None
self._destroy_image()
self._ready = False