Source code for simvx.graphics.renderer.volumetric_fog_pass

"""Volumetric fog pass: single-scatter ray-marched fog (desktop Vulkan).

Full-screen HDR pass ported from the web ``volumetric_fog.wgsl`` shader. Reads
the HDR colour + depth produced by the forward pass, ray-marches the
WorldEnvironment-driven global fog (plus an exponential height gradient and any
``FogVolume3D`` instances collected from the scene tree), and writes the
composited result to an alternate HDR target. The renderer then re-points the
tonemap pass's HDR input at that output: the same swap the custom
post-process pass uses.

GPU-driven: the FogVolume3D set is packed into a single SSBO once per frame in
numpy (``build_volume_ssbo``), never iterated in a per-object Python draw loop.
"""

import logging
from typing import Any

import numpy as np
import vulkan as vk

from ..gpu.descriptors import (
    DescriptorWriteBatch,
    allocate_descriptor_set,
    create_descriptor_set_layout,
)
from ..gpu.memory import create_buffer, create_sampler, upload_numpy
from ..gpu.pipeline import create_shader_module
from ..materials.shader_compiler import compile_shader
from .render_target import RenderTarget
from .transparency import extract_camera_position

__all__ = ["VolumetricFogPass", "MAX_FOG_VOLUMES", "FOG_VOLUME_STRIDE", "build_volume_ssbo"]

log = logging.getLogger(__name__)

# Uniform layout (see volumetric_fog.frag ``Uniforms``):
#   mat4 inv_vp(64) + vec4 sun_dir(16) + vec4 sun_colour(16) + vec4 albedo(16)
#   + vec4 emission(16) + vec4 params(16) + vec4 camera_pos(16) + vec4 height(16)
# = 64 + 7*16 = 176 bytes.
_UBO_SIZE = 176

# FogVolume SSBO stride: mat4 inv_transform(64) + vec4 albedo(16)
#   + vec4 params(16) + vec4 extra(16) = 112 bytes (std430, vec4-aligned).
FOG_VOLUME_STRIDE = 112
# Cap the per-frame volume SSBO. The buffer is allocated once at this size; the
# march loop only iterates the live count uploaded each frame.
MAX_FOG_VOLUMES = 64

# Ray-march step count (matches the web shader's fixed 32).
STEP_COUNT = 32


[docs] def build_volume_ssbo(volumes: list[Any]) -> tuple[np.ndarray, int]: """Pack ``FogVolume3D`` nodes into a flat std430 byte array. Returns ``(bytes_array, count)`` where ``count`` is clamped to ``MAX_FOG_VOLUMES``. Each record is:: mat4 inv_transform (world → unit-local, column-major for GLSL) vec4 albedo (rgb, a unused) vec4 params (density, shape, edge_falloff, height_falloff) vec4 extra (priority, 0, 0, 0) ``inv_transform`` folds the node's world transform with a per-axis scale of ``size * 0.5`` so the shape test in the shader is against a unit box / unit sphere / unit cylinder regardless of the volume's placement. """ from simvx.core.math.matrices import mat4_from_trs n = min(len(volumes), MAX_FOG_VOLUMES) buf = np.zeros(max(1, n) * FOG_VOLUME_STRIDE, dtype=np.uint8) f32 = buf.view(np.float32) for i in range(n): v = volumes[i] size = v.size ws = v.world_scale half = np.array( [max(float(size[0]) * 0.5 * float(ws[0]), 1e-4), max(float(size[1]) * 0.5 * float(ws[1]), 1e-4), max(float(size[2]) * 0.5 * float(ws[2]), 1e-4)], dtype=np.float32, ) # World transform of the node (row-major), with the per-axis scale set # to half-extents so the shape test in the shader is against a unit # box / sphere / cylinder regardless of placement. model = mat4_from_trs( v.world_position, v.world_rotation, half, ).astype(np.float32) try: inv = np.linalg.inv(model).astype(np.float32) except np.linalg.LinAlgError: inv = np.eye(4, dtype=np.float32) base = i * FOG_VOLUME_STRIDE // 4 # GLSL/std430 mat4 is column-major; numpy is row-major → transpose. f32[base:base + 16] = inv.T.ravel() alb = v.albedo f32[base + 16:base + 20] = [float(alb[0]), float(alb[1]), float(alb[2]), float(alb[3]) if len(alb) > 3 else 1.0] f32[base + 20:base + 24] = [ v.effective_density, float(int(v.shape)), float(v.falloff), float(v.height_falloff), ] f32[base + 24] = float(v.priority) return buf, n
[docs] class VolumetricFogPass: """Ray-marched single-scatter volumetric fog over the HDR target.""" def __init__(self, engine: Any) -> None: self._engine = engine self._ready = False self.enabled = False # Global fog settings (driven by WorldEnvironment via env sync). self.density = 0.05 self.length = 64.0 self.anisotropy = 0.2 self.albedo: tuple[float, float, float, float] = (1.0, 1.0, 1.0, 1.0) self.emission: tuple[float, float, float, float] = (0.0, 0.0, 0.0, 1.0) self.gi_inject = 0.0 self.fog_height = 0.0 self.fog_height_density = 0.0 # Per-frame volume payload, set by the renderer before render(). self._volume_bytes: np.ndarray | None = None self._volume_count = 0 # Per-frame sun + camera, set by the renderer before render(). self._sun_dir = np.array([0.577, 0.577, 0.577], dtype=np.float32) self._sun_colour = np.array([1.0, 1.0, 1.0], dtype=np.float32) self._sun_intensity = 1.0 self._inv_vp = np.eye(4, dtype=np.float32) self._camera_pos = np.zeros(3, dtype=np.float32) self._width = 0 self._height = 0 self._target: RenderTarget | None = None self._sampler: Any = None self._depth_sampler: Any = None self._ubo_buf: Any = None self._ubo_mem: Any = None self._vol_buf: Any = None self._vol_mem: Any = None self._desc_layout: Any = None self._desc_pool: Any = None self._desc_set: Any = None self._pipeline: Any = None self._pipeline_layout: Any = None self._vert_module: Any = None self._frag_module: Any = None
[docs] @property def output_view(self) -> Any: """Colour view of the fog-composited HDR result (tonemap input).""" return self._target.colour_view if self._target else None
[docs] def setup(self, width: int, height: int, hdr_view: Any, depth_view: Any, colour_format: int) -> None: """Allocate the alt HDR target, buffers, descriptors and pipeline.""" e = self._engine device = e.ctx.device self._width, self._height = width, height self._target = RenderTarget( device, e.ctx.physical_device, width, height, colour_format=colour_format, use_depth=False, queue=e.ctx.graphics_queue, command_pool=e.ctx.command_pool, ) self._sampler = create_sampler(device) self._depth_sampler = create_sampler(device, filter_mode=vk.VK_FILTER_NEAREST) self._ubo_buf, self._ubo_mem = create_buffer( device, e.ctx.physical_device, _UBO_SIZE, vk.VK_BUFFER_USAGE_UNIFORM_BUFFER_BIT, vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT, ) self._vol_buf, self._vol_mem = create_buffer( device, e.ctx.physical_device, MAX_FOG_VOLUMES * FOG_VOLUME_STRIDE, vk.VK_BUFFER_USAGE_STORAGE_BUFFER_BIT, vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT, ) fs = vk.VK_SHADER_STAGE_FRAGMENT_BIT self._desc_layout = create_descriptor_set_layout(device, [ (0, vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, fs, 1), # hdr (1, vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, fs, 1), # depth (2, vk.VK_DESCRIPTOR_TYPE_UNIFORM_BUFFER, fs, 1), # uniforms (3, vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, fs, 1), # fog volumes ]) pool_sizes = [ vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, descriptorCount=2), vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_UNIFORM_BUFFER, descriptorCount=1), vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, descriptorCount=1), ] self._desc_pool = vk.vkCreateDescriptorPool(device, vk.VkDescriptorPoolCreateInfo( maxSets=1, poolSizeCount=len(pool_sizes), pPoolSizes=pool_sizes, ), None) self._desc_set = allocate_descriptor_set(device, self._desc_pool, self._desc_layout) self._write_descriptors(hdr_view, depth_view) shader_dir = e.shader_dir self._vert_module = create_shader_module(device, compile_shader(shader_dir / "volumetric_fog.vert")) self._frag_module = create_shader_module(device, compile_shader(shader_dir / "volumetric_fog.frag")) self._create_pipeline(device, self._target.render_pass, (width, height)) self._ready = True log.debug("Volumetric fog pass initialised (%dx%d, max %d volumes)", width, height, MAX_FOG_VOLUMES)
def _write_descriptors(self, hdr_view: Any, depth_view: Any) -> None: with DescriptorWriteBatch(self._engine.ctx.device) as batch: batch.image(self._desc_set, 0, hdr_view, self._sampler) batch.image(self._desc_set, 1, depth_view, self._depth_sampler, image_layout=vk.VK_IMAGE_LAYOUT_DEPTH_STENCIL_READ_ONLY_OPTIMAL) batch.uniform_buffer(self._desc_set, 2, self._ubo_buf, _UBO_SIZE) batch.ssbo(self._desc_set, 3, self._vol_buf, MAX_FOG_VOLUMES * FOG_VOLUME_STRIDE)
[docs] def update_hdr_input(self, hdr_view: Any, depth_view: Any) -> None: """Re-point binding 0/1 at a new HDR colour + depth (after resize).""" if self._ready: self._write_descriptors(hdr_view, depth_view)
# -- per-frame inputs ----------------------------------------------------
[docs] def set_frame_data(self, view: np.ndarray, proj: np.ndarray, sun_dir: np.ndarray | None, sun_colour: np.ndarray | None, sun_intensity: float, volumes: list[Any]) -> None: """Stash camera / sun / volume data for the next ``render`` call.""" vp = (proj @ view).astype(np.float32) try: self._inv_vp = np.linalg.inv(vp).astype(np.float32) except np.linalg.LinAlgError: self._inv_vp = np.eye(4, dtype=np.float32) self._camera_pos = extract_camera_position(view) if sun_dir is not None and float(np.linalg.norm(sun_dir)) > 1e-6: # The shader expects the direction TOWARD the sun; lights store the # direction the light travels, so negate. self._sun_dir = (-np.asarray(sun_dir, dtype=np.float32)).copy() if sun_colour is not None: self._sun_colour = np.asarray(sun_colour, dtype=np.float32)[:3].copy() self._sun_intensity = float(sun_intensity) self._volume_bytes, self._volume_count = build_volume_ssbo(volumes)
def _upload_uniforms(self) -> None: e = self._engine data = np.zeros(_UBO_SIZE // 4, dtype=np.float32) # mat4 inv_vp (column-major for GLSL → transpose row-major numpy). data[0:16] = self._inv_vp.T.ravel() sd = self._sun_dir data[16:20] = [sd[0], sd[1], sd[2], self._sun_intensity] sc = self._sun_colour data[20:24] = [sc[0], sc[1], sc[2], 0.0] a = self.albedo data[24:28] = [float(a[0]), float(a[1]), float(a[2]), 0.0] em = self.emission data[28:32] = [float(em[0]), float(em[1]), float(em[2]), 0.0] data[32:36] = [float(self.density), float(self.length), float(self.anisotropy), float(self.gi_inject)] cp = self._camera_pos data[36:40] = [cp[0], cp[1], cp[2], 0.0] data[40:44] = [float(self.fog_height), float(self.fog_height_density), float(self._volume_count), float(STEP_COUNT)] upload_numpy(e.ctx.device, self._ubo_mem, data) if self._volume_bytes is not None and self._volume_count > 0: upload_numpy(e.ctx.device, self._vol_mem, self._volume_bytes[: self._volume_count * FOG_VOLUME_STRIDE]) # -- record --------------------------------------------------------------
[docs] def render(self, cmd: Any) -> None: """Composite volumetric fog into the alt HDR target. No-op when off.""" if not self._ready or not self.enabled or not self._pipeline: return self._upload_uniforms() rt = self._target rp_begin = vk.VkRenderPassBeginInfo( renderPass=rt.render_pass, framebuffer=rt.framebuffer, renderArea=vk.VkRect2D(offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=rt.width, height=rt.height)), clearValueCount=1, pClearValues=[vk.VkClearValue(color=vk.VkClearColorValue(float32=[0.0, 0.0, 0.0, 1.0]))], ) vk.vkCmdBeginRenderPass(cmd, rp_begin, vk.VK_SUBPASS_CONTENTS_INLINE) vk.vkCmdSetViewport(cmd, 0, 1, [vk.VkViewport( x=0.0, y=0.0, width=float(rt.width), height=float(rt.height), minDepth=0.0, maxDepth=1.0)]) vk.vkCmdSetScissor(cmd, 0, 1, [vk.VkRect2D( offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=rt.width, height=rt.height))]) vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline) vk.vkCmdBindDescriptorSets(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline_layout, 0, 1, [self._desc_set], 0, None) vk.vkCmdDraw(cmd, 3, 1, 0, 0) vk.vkCmdEndRenderPass(cmd)
# -- pipeline ------------------------------------------------------------ def _create_pipeline(self, device: Any, render_pass: Any, extent: tuple[int, int]) -> None: ffi = vk.ffi layout_ci = ffi.new("VkPipelineLayoutCreateInfo*") layout_ci.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_LAYOUT_CREATE_INFO set_layouts = ffi.new("VkDescriptorSetLayout[1]", [self._desc_layout]) layout_ci.setLayoutCount = 1 layout_ci.pSetLayouts = set_layouts layout_out = ffi.new("VkPipelineLayout*") if vk._vulkan._callApi(vk._vulkan.lib.vkCreatePipelineLayout, device, layout_ci, ffi.NULL, layout_out) != vk.VK_SUCCESS: raise RuntimeError("vkCreatePipelineLayout failed (volumetric fog)") self._pipeline_layout = layout_out[0] pi = ffi.new("VkGraphicsPipelineCreateInfo*") pi.sType = vk.VK_STRUCTURE_TYPE_GRAPHICS_PIPELINE_CREATE_INFO stages = ffi.new("VkPipelineShaderStageCreateInfo[2]") main_name = ffi.new("char[]", b"main") stages[0].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO stages[0].stage = vk.VK_SHADER_STAGE_VERTEX_BIT stages[0].module = self._vert_module stages[0].pName = main_name stages[1].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO stages[1].stage = vk.VK_SHADER_STAGE_FRAGMENT_BIT stages[1].module = self._frag_module stages[1].pName = main_name pi.stageCount = 2 pi.pStages = stages vi = ffi.new("VkPipelineVertexInputStateCreateInfo*") vi.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VERTEX_INPUT_STATE_CREATE_INFO pi.pVertexInputState = vi ia = ffi.new("VkPipelineInputAssemblyStateCreateInfo*") ia.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_INPUT_ASSEMBLY_STATE_CREATE_INFO ia.topology = vk.VK_PRIMITIVE_TOPOLOGY_TRIANGLE_LIST pi.pInputAssemblyState = ia vps = ffi.new("VkPipelineViewportStateCreateInfo*") vps.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VIEWPORT_STATE_CREATE_INFO vps.viewportCount = 1 viewport = ffi.new("VkViewport*") viewport.width = float(extent[0]) viewport.height = float(extent[1]) viewport.maxDepth = 1.0 vps.pViewports = viewport scissor = ffi.new("VkRect2D*") scissor.extent.width = extent[0] scissor.extent.height = extent[1] vps.scissorCount = 1 vps.pScissors = scissor pi.pViewportState = vps rs = ffi.new("VkPipelineRasterizationStateCreateInfo*") rs.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_RASTERIZATION_STATE_CREATE_INFO rs.polygonMode = vk.VK_POLYGON_MODE_FILL rs.lineWidth = 1.0 rs.cullMode = vk.VK_CULL_MODE_NONE pi.pRasterizationState = rs ms = ffi.new("VkPipelineMultisampleStateCreateInfo*") ms.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_MULTISAMPLE_STATE_CREATE_INFO ms.rasterizationSamples = vk.VK_SAMPLE_COUNT_1_BIT pi.pMultisampleState = ms dss = ffi.new("VkPipelineDepthStencilStateCreateInfo*") dss.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DEPTH_STENCIL_STATE_CREATE_INFO dss.depthTestEnable = 0 dss.depthWriteEnable = 0 pi.pDepthStencilState = dss cba = ffi.new("VkPipelineColorBlendAttachmentState*") cba.colorWriteMask = (vk.VK_COLOR_COMPONENT_R_BIT | vk.VK_COLOR_COMPONENT_G_BIT | vk.VK_COLOR_COMPONENT_B_BIT | vk.VK_COLOR_COMPONENT_A_BIT) cb = ffi.new("VkPipelineColorBlendStateCreateInfo*") cb.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_COLOR_BLEND_STATE_CREATE_INFO cb.attachmentCount = 1 cb.pAttachments = cba pi.pColorBlendState = cb dyn_states = ffi.new("VkDynamicState[2]", [vk.VK_DYNAMIC_STATE_VIEWPORT, vk.VK_DYNAMIC_STATE_SCISSOR]) ds = ffi.new("VkPipelineDynamicStateCreateInfo*") ds.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DYNAMIC_STATE_CREATE_INFO ds.dynamicStateCount = 2 ds.pDynamicStates = dyn_states pi.pDynamicState = ds pi.layout = self._pipeline_layout pi.renderPass = render_pass pipeline_out = ffi.new("VkPipeline*") if vk._vulkan._callApi(vk._vulkan.lib.vkCreateGraphicsPipelines, device, ffi.NULL, 1, pi, ffi.NULL, pipeline_out) != vk.VK_SUCCESS: raise RuntimeError("vkCreateGraphicsPipelines failed (volumetric fog)") self._pipeline = pipeline_out[0] # -- resize / cleanup ----------------------------------------------------
[docs] def resize(self, width: int, height: int, hdr_view: Any, depth_view: Any) -> None: if not self._ready: return device = self._engine.ctx.device self._width, self._height = width, height if self._target: self._target.destroy() self._target = RenderTarget( device, self._engine.ctx.physical_device, width, height, colour_format=vk.VK_FORMAT_R16G16B16A16_SFLOAT, use_depth=False, queue=self._engine.ctx.graphics_queue, command_pool=self._engine.ctx.command_pool, ) self._write_descriptors(hdr_view, depth_view)
[docs] def cleanup(self) -> None: if not self._ready: return device = self._engine.ctx.device if self._pipeline: vk.vkDestroyPipeline(device, self._pipeline, None) if self._pipeline_layout: vk.vkDestroyPipelineLayout(device, self._pipeline_layout, None) if self._vert_module: vk.vkDestroyShaderModule(device, self._vert_module, None) if self._frag_module: vk.vkDestroyShaderModule(device, self._frag_module, None) if self._desc_pool: vk.vkDestroyDescriptorPool(device, self._desc_pool, None) if self._desc_layout: vk.vkDestroyDescriptorSetLayout(device, self._desc_layout, None) if self._sampler: vk.vkDestroySampler(device, self._sampler, None) if self._depth_sampler: vk.vkDestroySampler(device, self._depth_sampler, None) if self._ubo_buf: vk.vkDestroyBuffer(device, self._ubo_buf, None) if self._ubo_mem: vk.vkFreeMemory(device, self._ubo_mem, None) if self._vol_buf: vk.vkDestroyBuffer(device, self._vol_buf, None) if self._vol_mem: vk.vkFreeMemory(device, self._vol_mem, None) if self._target: self._target.destroy() self._ready = False