Source code for simvx.graphics.renderer.fog_pass

"""Volumetric fog pass — distance-based and height-based fog via compute shader."""

from __future__ import annotations

import logging
from typing import Any

import numpy as np
import vulkan as vk

from ..gpu.pipeline import create_shader_module
from ..materials.shader_compiler import compile_shader

__all__ = ["FogPass"]

log = logging.getLogger(__name__)

# Push constant: mat4(64) + vec4(16) + vec4(16) + vec4(16) + vec4(16) = 128 bytes
_PC_SIZE = 128


[docs] class FogPass: """Compute-based volumetric fog: blends scene colour with fog based on depth. Supports linear, exponential, and exponential-squared distance fog, plus optional height-based fog falloff. Operates in-place on the HDR colour image between the scene render and tone mapping. """ def __init__(self, engine: Any): self._engine = engine self._ready = False # Compute pipeline self._pipeline: Any = None self._layout: Any = None self._module: Any = None # Descriptors self._desc_pool: Any = None self._desc_layout: Any = None self._desc_set: Any = None self._depth_sampler: Any = None # Colour image handle (for layout transitions) self._colour_image: Any = None # Dimensions self._width: int = 0 self._height: int = 0 # Public settings self.enabled: bool = False self.colour: tuple[float, float, float] = (0.7, 0.8, 0.9) self.density: float = 0.01 self.start: float = 10.0 self.end: float = 100.0 self.mode: str = "exponential" # "linear", "exponential", "exponential_squared" self.height_enabled: bool = False self.height_density: float = 0.1 self.height_falloff: float = 1.0
[docs] def setup(self, width: int, height: int, depth_view: Any, color_view: Any, colour_image: Any = None) -> None: """Initialize fog compute pipeline and descriptors.""" self._width = width self._height = height self._colour_image = colour_image self._create_sampler() self._create_descriptors(depth_view, color_view) self._create_pipeline() self._ready = True log.debug("Fog pass initialized (%dx%d)", width, height)
def _create_sampler(self) -> None: """Create nearest-filter sampler for depth texture.""" self._depth_sampler = vk.vkCreateSampler( self._engine.ctx.device, vk.VkSamplerCreateInfo( magFilter=vk.VK_FILTER_NEAREST, minFilter=vk.VK_FILTER_NEAREST, addressModeU=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE, addressModeV=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE, addressModeW=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE, anisotropyEnable=vk.VK_FALSE, unnormalizedCoordinates=vk.VK_FALSE, mipmapMode=vk.VK_SAMPLER_MIPMAP_MODE_NEAREST, ), None, ) def _create_descriptors(self, depth_view: Any, color_view: Any) -> None: """Create descriptor set: depth sampler (binding 0) + colour storage image (binding 1).""" device = self._engine.ctx.device bindings = [ vk.VkDescriptorSetLayoutBinding( binding=0, descriptorType=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, descriptorCount=1, stageFlags=vk.VK_SHADER_STAGE_COMPUTE_BIT, ), vk.VkDescriptorSetLayoutBinding( binding=1, descriptorType=vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE, descriptorCount=1, stageFlags=vk.VK_SHADER_STAGE_COMPUTE_BIT, ), ] self._desc_layout = vk.vkCreateDescriptorSetLayout( device, vk.VkDescriptorSetLayoutCreateInfo(bindingCount=2, pBindings=bindings), None ) pool_sizes = [ vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, descriptorCount=1), vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE, descriptorCount=1), ] self._desc_pool = vk.vkCreateDescriptorPool( device, vk.VkDescriptorPoolCreateInfo(maxSets=1, poolSizeCount=2, pPoolSizes=pool_sizes), None ) sets = vk.vkAllocateDescriptorSets( device, vk.VkDescriptorSetAllocateInfo( descriptorPool=self._desc_pool, descriptorSetCount=1, pSetLayouts=[self._desc_layout], ), ) self._desc_set = sets[0] self._write_descriptors(depth_view, color_view) def _write_descriptors(self, depth_view: Any, color_view: Any) -> None: """Write depth sampler and colour storage image to descriptor set.""" device = self._engine.ctx.device depth_info = vk.VkDescriptorImageInfo( sampler=self._depth_sampler, imageView=depth_view, imageLayout=vk.VK_IMAGE_LAYOUT_DEPTH_STENCIL_READ_ONLY_OPTIMAL, ) color_info = vk.VkDescriptorImageInfo( imageView=color_view, imageLayout=vk.VK_IMAGE_LAYOUT_GENERAL, ) vk.vkUpdateDescriptorSets( device, 2, [ vk.VkWriteDescriptorSet( dstSet=self._desc_set, dstBinding=0, dstArrayElement=0, descriptorCount=1, descriptorType=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, pImageInfo=[depth_info], ), vk.VkWriteDescriptorSet( dstSet=self._desc_set, dstBinding=1, dstArrayElement=0, descriptorCount=1, descriptorType=vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE, pImageInfo=[color_info], ), ], 0, None, ) def _create_pipeline(self) -> None: """Create fog compute pipeline.""" e = self._engine device = e.ctx.device ffi = vk.ffi fog_spv = compile_shader(e.shader_dir / "fog.comp") self._module = create_shader_module(device, fog_spv) push_range = ffi.new("VkPushConstantRange*") push_range.stageFlags = vk.VK_SHADER_STAGE_COMPUTE_BIT push_range.offset = 0 push_range.size = _PC_SIZE layout_ci = ffi.new("VkPipelineLayoutCreateInfo*") layout_ci.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_LAYOUT_CREATE_INFO set_layouts = ffi.new("VkDescriptorSetLayout[1]", [self._desc_layout]) layout_ci.setLayoutCount = 1 layout_ci.pSetLayouts = set_layouts layout_ci.pushConstantRangeCount = 1 layout_ci.pPushConstantRanges = push_range layout_out = ffi.new("VkPipelineLayout*") result = vk._vulkan._callApi( vk._vulkan.lib.vkCreatePipelineLayout, device, layout_ci, ffi.NULL, layout_out, ) if result != vk.VK_SUCCESS: raise RuntimeError(f"vkCreatePipelineLayout (fog) failed: {result}") self._layout = layout_out[0] # Create compute pipeline main_name = ffi.new("char[]", b"main") ci = ffi.new("VkComputePipelineCreateInfo*") ci.sType = vk.VK_STRUCTURE_TYPE_COMPUTE_PIPELINE_CREATE_INFO ci.layout = self._layout stage = ffi.new("VkPipelineShaderStageCreateInfo*") stage.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO stage.stage = vk.VK_SHADER_STAGE_COMPUTE_BIT stage.module = self._module stage.pName = main_name ci.stage = stage[0] pipeline_out = ffi.new("VkPipeline*") result = vk._vulkan._callApi( vk._vulkan.lib.vkCreateComputePipelines, device, ffi.NULL, 1, ci, ffi.NULL, pipeline_out, ) if result != vk.VK_SUCCESS: raise RuntimeError(f"vkCreateComputePipelines (fog) failed: {result}") self._pipeline = pipeline_out[0]
[docs] def render(self, cmd: Any, inv_vp: np.ndarray, camera_y: float = 0.0) -> None: """Dispatch fog compute shader. Call after HDR pass ends, before tonemap. Args: cmd: Active command buffer (outside any render pass). inv_vp: Inverse view-projection matrix (row-major numpy). camera_y: Camera world Y position for height fog. """ if not self._ready or not self.enabled: return ffi = vk.ffi groups_x = (self._width + 7) // 8 groups_y = (self._height + 7) // 8 # Transition colour image: SHADER_READ_ONLY_OPTIMAL -> GENERAL for storage access if self._colour_image: img_barrier = vk.VkImageMemoryBarrier( srcAccessMask=vk.VK_ACCESS_SHADER_READ_BIT, dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT | vk.VK_ACCESS_SHADER_WRITE_BIT, oldLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL, newLayout=vk.VK_IMAGE_LAYOUT_GENERAL, srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, image=self._colour_image, subresourceRange=vk.VkImageSubresourceRange( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1, ), ) vk.vkCmdPipelineBarrier( cmd, vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT | vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT, 0, 0, None, 0, None, 1, [img_barrier], ) vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._pipeline) vk.vkCmdBindDescriptorSets( cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._layout, 0, 1, [self._desc_set], 0, None, ) # Fog mode encoding: 0=linear, 1=exponential, 2=exponential_squared mode_map = {"linear": 0.0, "exponential": 1.0, "exponential_squared": 2.0} mode_val = mode_map.get(self.mode, 1.0) # Pack push constants: mat4(64) + 4*vec4(64) = 128 bytes inv_vp_t = np.ascontiguousarray(inv_vp.T, dtype=np.float32) fog_colour = np.array([*self.colour, self.density], dtype=np.float32) fog_params = np.array([self.start, self.end, mode_val, 0.0], dtype=np.float32) height_params = np.array( [ 1.0 if self.height_enabled else 0.0, self.height_density, self.height_falloff, camera_y, ], dtype=np.float32, ) resolution = np.array( [ float(self._width), float(self._height), 1.0 / self._width, 1.0 / self._height, ], dtype=np.float32, ) pc_data = inv_vp_t.tobytes() + fog_colour.tobytes() + fog_params.tobytes() pc_data += height_params.tobytes() + resolution.tobytes() cbuf = ffi.new("char[]", pc_data) vk._vulkan.lib.vkCmdPushConstants( cmd, self._layout, vk.VK_SHADER_STAGE_COMPUTE_BIT, 0, _PC_SIZE, cbuf, ) vk.vkCmdDispatch(cmd, groups_x, groups_y, 1) # Transition colour image: GENERAL -> SHADER_READ_ONLY_OPTIMAL for subsequent passes if self._colour_image: img_barrier = vk.VkImageMemoryBarrier( srcAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT, dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT, oldLayout=vk.VK_IMAGE_LAYOUT_GENERAL, newLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL, srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, image=self._colour_image, subresourceRange=vk.VkImageSubresourceRange( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1, ), ) vk.vkCmdPipelineBarrier( cmd, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT | vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT, 0, 0, None, 0, None, 1, [img_barrier], ) else: # Fallback: memory-only barrier when image handle not available barrier = vk.VkMemoryBarrier( srcAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT, dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT | vk.VK_ACCESS_SHADER_WRITE_BIT, ) vk.vkCmdPipelineBarrier( cmd, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT, vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT | vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT, 0, 1, [barrier], 0, None, 0, None, )
[docs] def resize(self, width: int, height: int, depth_view: Any, color_view: Any, colour_image: Any = None) -> None: """Update descriptors for new dimensions.""" if not self._ready: return self._width = width self._height = height if colour_image is not None: self._colour_image = colour_image self._write_descriptors(depth_view, color_view)
[docs] def cleanup(self) -> None: """Release all GPU resources.""" if not self._ready: return device = self._engine.ctx.device if self._pipeline: vk.vkDestroyPipeline(device, self._pipeline, None) if self._layout: vk.vkDestroyPipelineLayout(device, self._layout, None) if self._module: vk.vkDestroyShaderModule(device, self._module, None) if self._desc_pool: vk.vkDestroyDescriptorPool(device, self._desc_pool, None) if self._desc_layout: vk.vkDestroyDescriptorSetLayout(device, self._desc_layout, None) if self._depth_sampler: vk.vkDestroySampler(device, self._depth_sampler, None) self._ready = False