"""Volumetric fog pass — distance-based and height-based fog via compute shader."""
from __future__ import annotations
import logging
from typing import Any
import numpy as np
import vulkan as vk
from ..gpu.pipeline import create_shader_module
from ..materials.shader_compiler import compile_shader
__all__ = ["FogPass"]
log = logging.getLogger(__name__)
# Push constant: mat4(64) + vec4(16) + vec4(16) + vec4(16) + vec4(16) = 128 bytes
_PC_SIZE = 128
[docs]
class FogPass:
"""Compute-based volumetric fog: blends scene colour with fog based on depth.
Supports linear, exponential, and exponential-squared distance fog,
plus optional height-based fog falloff. Operates in-place on the HDR
colour image between the scene render and tone mapping.
"""
def __init__(self, engine: Any):
self._engine = engine
self._ready = False
# Compute pipeline
self._pipeline: Any = None
self._layout: Any = None
self._module: Any = None
# Descriptors
self._desc_pool: Any = None
self._desc_layout: Any = None
self._desc_set: Any = None
self._depth_sampler: Any = None
# Colour image handle (for layout transitions)
self._colour_image: Any = None
# Dimensions
self._width: int = 0
self._height: int = 0
# Public settings
self.enabled: bool = False
self.colour: tuple[float, float, float] = (0.7, 0.8, 0.9)
self.density: float = 0.01
self.start: float = 10.0
self.end: float = 100.0
self.mode: str = "exponential" # "linear", "exponential", "exponential_squared"
self.height_enabled: bool = False
self.height_density: float = 0.1
self.height_falloff: float = 1.0
[docs]
def setup(self, width: int, height: int, depth_view: Any, color_view: Any, colour_image: Any = None) -> None:
"""Initialize fog compute pipeline and descriptors."""
self._width = width
self._height = height
self._colour_image = colour_image
self._create_sampler()
self._create_descriptors(depth_view, color_view)
self._create_pipeline()
self._ready = True
log.debug("Fog pass initialized (%dx%d)", width, height)
def _create_sampler(self) -> None:
"""Create nearest-filter sampler for depth texture."""
self._depth_sampler = vk.vkCreateSampler(
self._engine.ctx.device,
vk.VkSamplerCreateInfo(
magFilter=vk.VK_FILTER_NEAREST,
minFilter=vk.VK_FILTER_NEAREST,
addressModeU=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
addressModeV=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
addressModeW=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
anisotropyEnable=vk.VK_FALSE,
unnormalizedCoordinates=vk.VK_FALSE,
mipmapMode=vk.VK_SAMPLER_MIPMAP_MODE_NEAREST,
),
None,
)
def _create_descriptors(self, depth_view: Any, color_view: Any) -> None:
"""Create descriptor set: depth sampler (binding 0) + colour storage image (binding 1)."""
device = self._engine.ctx.device
bindings = [
vk.VkDescriptorSetLayoutBinding(
binding=0,
descriptorType=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER,
descriptorCount=1,
stageFlags=vk.VK_SHADER_STAGE_COMPUTE_BIT,
),
vk.VkDescriptorSetLayoutBinding(
binding=1,
descriptorType=vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE,
descriptorCount=1,
stageFlags=vk.VK_SHADER_STAGE_COMPUTE_BIT,
),
]
self._desc_layout = vk.vkCreateDescriptorSetLayout(
device, vk.VkDescriptorSetLayoutCreateInfo(bindingCount=2, pBindings=bindings), None
)
pool_sizes = [
vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, descriptorCount=1),
vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE, descriptorCount=1),
]
self._desc_pool = vk.vkCreateDescriptorPool(
device, vk.VkDescriptorPoolCreateInfo(maxSets=1, poolSizeCount=2, pPoolSizes=pool_sizes), None
)
sets = vk.vkAllocateDescriptorSets(
device,
vk.VkDescriptorSetAllocateInfo(
descriptorPool=self._desc_pool,
descriptorSetCount=1,
pSetLayouts=[self._desc_layout],
),
)
self._desc_set = sets[0]
self._write_descriptors(depth_view, color_view)
def _write_descriptors(self, depth_view: Any, color_view: Any) -> None:
"""Write depth sampler and colour storage image to descriptor set."""
device = self._engine.ctx.device
depth_info = vk.VkDescriptorImageInfo(
sampler=self._depth_sampler,
imageView=depth_view,
imageLayout=vk.VK_IMAGE_LAYOUT_DEPTH_STENCIL_READ_ONLY_OPTIMAL,
)
color_info = vk.VkDescriptorImageInfo(
imageView=color_view,
imageLayout=vk.VK_IMAGE_LAYOUT_GENERAL,
)
vk.vkUpdateDescriptorSets(
device,
2,
[
vk.VkWriteDescriptorSet(
dstSet=self._desc_set,
dstBinding=0,
dstArrayElement=0,
descriptorCount=1,
descriptorType=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER,
pImageInfo=[depth_info],
),
vk.VkWriteDescriptorSet(
dstSet=self._desc_set,
dstBinding=1,
dstArrayElement=0,
descriptorCount=1,
descriptorType=vk.VK_DESCRIPTOR_TYPE_STORAGE_IMAGE,
pImageInfo=[color_info],
),
],
0,
None,
)
def _create_pipeline(self) -> None:
"""Create fog compute pipeline."""
e = self._engine
device = e.ctx.device
ffi = vk.ffi
fog_spv = compile_shader(e.shader_dir / "fog.comp")
self._module = create_shader_module(device, fog_spv)
push_range = ffi.new("VkPushConstantRange*")
push_range.stageFlags = vk.VK_SHADER_STAGE_COMPUTE_BIT
push_range.offset = 0
push_range.size = _PC_SIZE
layout_ci = ffi.new("VkPipelineLayoutCreateInfo*")
layout_ci.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_LAYOUT_CREATE_INFO
set_layouts = ffi.new("VkDescriptorSetLayout[1]", [self._desc_layout])
layout_ci.setLayoutCount = 1
layout_ci.pSetLayouts = set_layouts
layout_ci.pushConstantRangeCount = 1
layout_ci.pPushConstantRanges = push_range
layout_out = ffi.new("VkPipelineLayout*")
result = vk._vulkan._callApi(
vk._vulkan.lib.vkCreatePipelineLayout,
device,
layout_ci,
ffi.NULL,
layout_out,
)
if result != vk.VK_SUCCESS:
raise RuntimeError(f"vkCreatePipelineLayout (fog) failed: {result}")
self._layout = layout_out[0]
# Create compute pipeline
main_name = ffi.new("char[]", b"main")
ci = ffi.new("VkComputePipelineCreateInfo*")
ci.sType = vk.VK_STRUCTURE_TYPE_COMPUTE_PIPELINE_CREATE_INFO
ci.layout = self._layout
stage = ffi.new("VkPipelineShaderStageCreateInfo*")
stage.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO
stage.stage = vk.VK_SHADER_STAGE_COMPUTE_BIT
stage.module = self._module
stage.pName = main_name
ci.stage = stage[0]
pipeline_out = ffi.new("VkPipeline*")
result = vk._vulkan._callApi(
vk._vulkan.lib.vkCreateComputePipelines,
device,
ffi.NULL,
1,
ci,
ffi.NULL,
pipeline_out,
)
if result != vk.VK_SUCCESS:
raise RuntimeError(f"vkCreateComputePipelines (fog) failed: {result}")
self._pipeline = pipeline_out[0]
[docs]
def render(self, cmd: Any, inv_vp: np.ndarray, camera_y: float = 0.0) -> None:
"""Dispatch fog compute shader. Call after HDR pass ends, before tonemap.
Args:
cmd: Active command buffer (outside any render pass).
inv_vp: Inverse view-projection matrix (row-major numpy).
camera_y: Camera world Y position for height fog.
"""
if not self._ready or not self.enabled:
return
ffi = vk.ffi
groups_x = (self._width + 7) // 8
groups_y = (self._height + 7) // 8
# Transition colour image: SHADER_READ_ONLY_OPTIMAL -> GENERAL for storage access
if self._colour_image:
img_barrier = vk.VkImageMemoryBarrier(
srcAccessMask=vk.VK_ACCESS_SHADER_READ_BIT,
dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT | vk.VK_ACCESS_SHADER_WRITE_BIT,
oldLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL,
newLayout=vk.VK_IMAGE_LAYOUT_GENERAL,
srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
image=self._colour_image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1,
),
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT | vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
0, 0, None, 0, None, 1, [img_barrier],
)
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_COMPUTE, self._pipeline)
vk.vkCmdBindDescriptorSets(
cmd,
vk.VK_PIPELINE_BIND_POINT_COMPUTE,
self._layout,
0,
1,
[self._desc_set],
0,
None,
)
# Fog mode encoding: 0=linear, 1=exponential, 2=exponential_squared
mode_map = {"linear": 0.0, "exponential": 1.0, "exponential_squared": 2.0}
mode_val = mode_map.get(self.mode, 1.0)
# Pack push constants: mat4(64) + 4*vec4(64) = 128 bytes
inv_vp_t = np.ascontiguousarray(inv_vp.T, dtype=np.float32)
fog_colour = np.array([*self.colour, self.density], dtype=np.float32)
fog_params = np.array([self.start, self.end, mode_val, 0.0], dtype=np.float32)
height_params = np.array(
[
1.0 if self.height_enabled else 0.0,
self.height_density,
self.height_falloff,
camera_y,
],
dtype=np.float32,
)
resolution = np.array(
[
float(self._width),
float(self._height),
1.0 / self._width,
1.0 / self._height,
],
dtype=np.float32,
)
pc_data = inv_vp_t.tobytes() + fog_colour.tobytes() + fog_params.tobytes()
pc_data += height_params.tobytes() + resolution.tobytes()
cbuf = ffi.new("char[]", pc_data)
vk._vulkan.lib.vkCmdPushConstants(
cmd,
self._layout,
vk.VK_SHADER_STAGE_COMPUTE_BIT,
0,
_PC_SIZE,
cbuf,
)
vk.vkCmdDispatch(cmd, groups_x, groups_y, 1)
# Transition colour image: GENERAL -> SHADER_READ_ONLY_OPTIMAL for subsequent passes
if self._colour_image:
img_barrier = vk.VkImageMemoryBarrier(
srcAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT,
dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT,
oldLayout=vk.VK_IMAGE_LAYOUT_GENERAL,
newLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL,
srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
image=self._colour_image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1,
),
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT | vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT,
0, 0, None, 0, None, 1, [img_barrier],
)
else:
# Fallback: memory-only barrier when image handle not available
barrier = vk.VkMemoryBarrier(
srcAccessMask=vk.VK_ACCESS_SHADER_WRITE_BIT,
dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT | vk.VK_ACCESS_SHADER_WRITE_BIT,
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT,
vk.VK_PIPELINE_STAGE_COMPUTE_SHADER_BIT | vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT,
0, 1, [barrier], 0, None, 0, None,
)
[docs]
def resize(self, width: int, height: int, depth_view: Any, color_view: Any, colour_image: Any = None) -> None:
"""Update descriptors for new dimensions."""
if not self._ready:
return
self._width = width
self._height = height
if colour_image is not None:
self._colour_image = colour_image
self._write_descriptors(depth_view, color_view)
[docs]
def cleanup(self) -> None:
"""Release all GPU resources."""
if not self._ready:
return
device = self._engine.ctx.device
if self._pipeline:
vk.vkDestroyPipeline(device, self._pipeline, None)
if self._layout:
vk.vkDestroyPipelineLayout(device, self._layout, None)
if self._module:
vk.vkDestroyShaderModule(device, self._module, None)
if self._desc_pool:
vk.vkDestroyDescriptorPool(device, self._desc_pool, None)
if self._desc_layout:
vk.vkDestroyDescriptorSetLayout(device, self._desc_layout, None)
if self._depth_sampler:
vk.vkDestroySampler(device, self._depth_sampler, None)
self._ready = False