"""Volumetric fog pass: single-scatter ray-marched fog (desktop Vulkan).
Full-screen HDR pass ported from the web ``volumetric_fog.wgsl`` shader. Reads
the HDR colour + depth produced by the forward pass, ray-marches the
WorldEnvironment-driven global fog (plus an exponential height gradient and any
``FogVolume3D`` instances collected from the scene tree), and writes the
composited result to an alternate HDR target. The renderer then re-points the
tonemap pass's HDR input at that output: the same swap the custom
post-process pass uses.
GPU-driven: the FogVolume3D set is packed into a single SSBO once per frame in
numpy (``build_volume_ssbo``), never iterated in a per-object Python draw loop.
"""
import logging
from typing import Any
import numpy as np
import vulkan as vk
from ..gpu.descriptors import (
DescriptorWriteBatch,
allocate_descriptor_set,
create_descriptor_set_layout,
)
from ..gpu.memory import create_buffer, create_sampler, upload_numpy
from ..gpu.pipeline import create_shader_module
from ..materials.shader_compiler import compile_shader
from .render_target import RenderTarget
from .transparency import extract_camera_position
__all__ = ["VolumetricFogPass", "MAX_FOG_VOLUMES", "FOG_VOLUME_STRIDE", "build_volume_ssbo"]
log = logging.getLogger(__name__)
# Uniform layout (see volumetric_fog.frag ``Uniforms``):
# mat4 inv_vp(64) + vec4 sun_dir(16) + vec4 sun_colour(16) + vec4 albedo(16)
# + vec4 emission(16) + vec4 params(16) + vec4 camera_pos(16) + vec4 height(16)
# = 64 + 7*16 = 176 bytes.
_UBO_SIZE = 176
# FogVolume SSBO stride: mat4 inv_transform(64) + vec4 albedo(16)
# + vec4 params(16) + vec4 extra(16) = 112 bytes (std430, vec4-aligned).
FOG_VOLUME_STRIDE = 112
# Cap the per-frame volume SSBO. The buffer is allocated once at this size; the
# march loop only iterates the live count uploaded each frame.
MAX_FOG_VOLUMES = 64
# Ray-march step count (matches the web shader's fixed 32).
STEP_COUNT = 32
[docs]
def build_volume_ssbo(volumes: list[Any]) -> tuple[np.ndarray, int]:
"""Pack ``FogVolume3D`` nodes into a flat std430 byte array.
Returns ``(bytes_array, count)`` where ``count`` is clamped to
``MAX_FOG_VOLUMES``. Each record is::
mat4 inv_transform (world → unit-local, column-major for GLSL)
vec4 albedo (rgb, a unused)
vec4 params (density, shape, edge_falloff, height_falloff)
vec4 extra (priority, 0, 0, 0)
``inv_transform`` folds the node's world transform with a per-axis scale of
``size * 0.5`` so the shape test in the shader is against a unit box / unit
sphere / unit cylinder regardless of the volume's placement.
"""
from simvx.core.math.matrices import mat4_from_trs
n = min(len(volumes), MAX_FOG_VOLUMES)
buf = np.zeros(max(1, n) * FOG_VOLUME_STRIDE, dtype=np.uint8)
f32 = buf.view(np.float32)
for i in range(n):
v = volumes[i]
size = v.size
ws = v.world_scale
half = np.array(
[max(float(size[0]) * 0.5 * float(ws[0]), 1e-4),
max(float(size[1]) * 0.5 * float(ws[1]), 1e-4),
max(float(size[2]) * 0.5 * float(ws[2]), 1e-4)],
dtype=np.float32,
)
# World transform of the node (row-major), with the per-axis scale set
# to half-extents so the shape test in the shader is against a unit
# box / sphere / cylinder regardless of placement.
model = mat4_from_trs(
v.world_position, v.world_rotation, half,
).astype(np.float32)
try:
inv = np.linalg.inv(model).astype(np.float32)
except np.linalg.LinAlgError:
inv = np.eye(4, dtype=np.float32)
base = i * FOG_VOLUME_STRIDE // 4
# GLSL/std430 mat4 is column-major; numpy is row-major → transpose.
f32[base:base + 16] = inv.T.ravel()
alb = v.albedo
f32[base + 16:base + 20] = [float(alb[0]), float(alb[1]), float(alb[2]),
float(alb[3]) if len(alb) > 3 else 1.0]
f32[base + 20:base + 24] = [
v.effective_density,
float(int(v.shape)),
float(v.falloff),
float(v.height_falloff),
]
f32[base + 24] = float(v.priority)
return buf, n
[docs]
class VolumetricFogPass:
"""Ray-marched single-scatter volumetric fog over the HDR target."""
def __init__(self, engine: Any) -> None:
self._engine = engine
self._ready = False
self.enabled = False
# Global fog settings (driven by WorldEnvironment via env sync).
self.density = 0.05
self.length = 64.0
self.anisotropy = 0.2
self.albedo: tuple[float, float, float, float] = (1.0, 1.0, 1.0, 1.0)
self.emission: tuple[float, float, float, float] = (0.0, 0.0, 0.0, 1.0)
self.gi_inject = 0.0
self.fog_height = 0.0
self.fog_height_density = 0.0
# Per-frame volume payload, set by the renderer before render().
self._volume_bytes: np.ndarray | None = None
self._volume_count = 0
# Per-frame sun + camera, set by the renderer before render().
self._sun_dir = np.array([0.577, 0.577, 0.577], dtype=np.float32)
self._sun_colour = np.array([1.0, 1.0, 1.0], dtype=np.float32)
self._sun_intensity = 1.0
self._inv_vp = np.eye(4, dtype=np.float32)
self._camera_pos = np.zeros(3, dtype=np.float32)
self._width = 0
self._height = 0
self._target: RenderTarget | None = None
self._sampler: Any = None
self._depth_sampler: Any = None
self._ubo_buf: Any = None
self._ubo_mem: Any = None
self._vol_buf: Any = None
self._vol_mem: Any = None
self._desc_layout: Any = None
self._desc_pool: Any = None
self._desc_set: Any = None
self._pipeline: Any = None
self._pipeline_layout: Any = None
self._vert_module: Any = None
self._frag_module: Any = None
[docs]
@property
def output_view(self) -> Any:
"""Colour view of the fog-composited HDR result (tonemap input)."""
return self._target.colour_view if self._target else None
[docs]
def setup(self, width: int, height: int, hdr_view: Any, depth_view: Any,
colour_format: int) -> None:
"""Allocate the alt HDR target, buffers, descriptors and pipeline."""
e = self._engine
device = e.ctx.device
self._width, self._height = width, height
self._target = RenderTarget(
device, e.ctx.physical_device, width, height,
colour_format=colour_format, use_depth=False,
queue=e.ctx.graphics_queue, command_pool=e.ctx.command_pool,
)
self._sampler = create_sampler(device)
self._depth_sampler = create_sampler(device, filter_mode=vk.VK_FILTER_NEAREST)
self._ubo_buf, self._ubo_mem = create_buffer(
device, e.ctx.physical_device, _UBO_SIZE,
vk.VK_BUFFER_USAGE_UNIFORM_BUFFER_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
self._vol_buf, self._vol_mem = create_buffer(
device, e.ctx.physical_device, MAX_FOG_VOLUMES * FOG_VOLUME_STRIDE,
vk.VK_BUFFER_USAGE_STORAGE_BUFFER_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
fs = vk.VK_SHADER_STAGE_FRAGMENT_BIT
self._desc_layout = create_descriptor_set_layout(device, [
(0, vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, fs, 1), # hdr
(1, vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, fs, 1), # depth
(2, vk.VK_DESCRIPTOR_TYPE_UNIFORM_BUFFER, fs, 1), # uniforms
(3, vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, fs, 1), # fog volumes
])
pool_sizes = [
vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, descriptorCount=2),
vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_UNIFORM_BUFFER, descriptorCount=1),
vk.VkDescriptorPoolSize(type=vk.VK_DESCRIPTOR_TYPE_STORAGE_BUFFER, descriptorCount=1),
]
self._desc_pool = vk.vkCreateDescriptorPool(device, vk.VkDescriptorPoolCreateInfo(
maxSets=1, poolSizeCount=len(pool_sizes), pPoolSizes=pool_sizes,
), None)
self._desc_set = allocate_descriptor_set(device, self._desc_pool, self._desc_layout)
self._write_descriptors(hdr_view, depth_view)
shader_dir = e.shader_dir
self._vert_module = create_shader_module(device, compile_shader(shader_dir / "volumetric_fog.vert"))
self._frag_module = create_shader_module(device, compile_shader(shader_dir / "volumetric_fog.frag"))
self._create_pipeline(device, self._target.render_pass, (width, height))
self._ready = True
log.debug("Volumetric fog pass initialised (%dx%d, max %d volumes)", width, height, MAX_FOG_VOLUMES)
def _write_descriptors(self, hdr_view: Any, depth_view: Any) -> None:
with DescriptorWriteBatch(self._engine.ctx.device) as batch:
batch.image(self._desc_set, 0, hdr_view, self._sampler)
batch.image(self._desc_set, 1, depth_view, self._depth_sampler,
image_layout=vk.VK_IMAGE_LAYOUT_DEPTH_STENCIL_READ_ONLY_OPTIMAL)
batch.uniform_buffer(self._desc_set, 2, self._ubo_buf, _UBO_SIZE)
batch.ssbo(self._desc_set, 3, self._vol_buf, MAX_FOG_VOLUMES * FOG_VOLUME_STRIDE)
# -- per-frame inputs ----------------------------------------------------
[docs]
def set_frame_data(self, view: np.ndarray, proj: np.ndarray,
sun_dir: np.ndarray | None, sun_colour: np.ndarray | None,
sun_intensity: float, volumes: list[Any]) -> None:
"""Stash camera / sun / volume data for the next ``render`` call."""
vp = (proj @ view).astype(np.float32)
try:
self._inv_vp = np.linalg.inv(vp).astype(np.float32)
except np.linalg.LinAlgError:
self._inv_vp = np.eye(4, dtype=np.float32)
self._camera_pos = extract_camera_position(view)
if sun_dir is not None and float(np.linalg.norm(sun_dir)) > 1e-6:
# The shader expects the direction TOWARD the sun; lights store the
# direction the light travels, so negate.
self._sun_dir = (-np.asarray(sun_dir, dtype=np.float32)).copy()
if sun_colour is not None:
self._sun_colour = np.asarray(sun_colour, dtype=np.float32)[:3].copy()
self._sun_intensity = float(sun_intensity)
self._volume_bytes, self._volume_count = build_volume_ssbo(volumes)
def _upload_uniforms(self) -> None:
e = self._engine
data = np.zeros(_UBO_SIZE // 4, dtype=np.float32)
# mat4 inv_vp (column-major for GLSL → transpose row-major numpy).
data[0:16] = self._inv_vp.T.ravel()
sd = self._sun_dir
data[16:20] = [sd[0], sd[1], sd[2], self._sun_intensity]
sc = self._sun_colour
data[20:24] = [sc[0], sc[1], sc[2], 0.0]
a = self.albedo
data[24:28] = [float(a[0]), float(a[1]), float(a[2]), 0.0]
em = self.emission
data[28:32] = [float(em[0]), float(em[1]), float(em[2]), 0.0]
data[32:36] = [float(self.density), float(self.length), float(self.anisotropy), float(self.gi_inject)]
cp = self._camera_pos
data[36:40] = [cp[0], cp[1], cp[2], 0.0]
data[40:44] = [float(self.fog_height), float(self.fog_height_density),
float(self._volume_count), float(STEP_COUNT)]
upload_numpy(e.ctx.device, self._ubo_mem, data)
if self._volume_bytes is not None and self._volume_count > 0:
upload_numpy(e.ctx.device, self._vol_mem,
self._volume_bytes[: self._volume_count * FOG_VOLUME_STRIDE])
# -- record --------------------------------------------------------------
[docs]
def render(self, cmd: Any) -> None:
"""Composite volumetric fog into the alt HDR target. No-op when off."""
if not self._ready or not self.enabled or not self._pipeline:
return
self._upload_uniforms()
rt = self._target
rp_begin = vk.VkRenderPassBeginInfo(
renderPass=rt.render_pass, framebuffer=rt.framebuffer,
renderArea=vk.VkRect2D(offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=rt.width, height=rt.height)),
clearValueCount=1,
pClearValues=[vk.VkClearValue(color=vk.VkClearColorValue(float32=[0.0, 0.0, 0.0, 1.0]))],
)
vk.vkCmdBeginRenderPass(cmd, rp_begin, vk.VK_SUBPASS_CONTENTS_INLINE)
vk.vkCmdSetViewport(cmd, 0, 1, [vk.VkViewport(
x=0.0, y=0.0, width=float(rt.width), height=float(rt.height), minDepth=0.0, maxDepth=1.0)])
vk.vkCmdSetScissor(cmd, 0, 1, [vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=rt.width, height=rt.height))])
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline)
vk.vkCmdBindDescriptorSets(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS,
self._pipeline_layout, 0, 1, [self._desc_set], 0, None)
vk.vkCmdDraw(cmd, 3, 1, 0, 0)
vk.vkCmdEndRenderPass(cmd)
# -- pipeline ------------------------------------------------------------
def _create_pipeline(self, device: Any, render_pass: Any, extent: tuple[int, int]) -> None:
ffi = vk.ffi
layout_ci = ffi.new("VkPipelineLayoutCreateInfo*")
layout_ci.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_LAYOUT_CREATE_INFO
set_layouts = ffi.new("VkDescriptorSetLayout[1]", [self._desc_layout])
layout_ci.setLayoutCount = 1
layout_ci.pSetLayouts = set_layouts
layout_out = ffi.new("VkPipelineLayout*")
if vk._vulkan._callApi(vk._vulkan.lib.vkCreatePipelineLayout, device, layout_ci,
ffi.NULL, layout_out) != vk.VK_SUCCESS:
raise RuntimeError("vkCreatePipelineLayout failed (volumetric fog)")
self._pipeline_layout = layout_out[0]
pi = ffi.new("VkGraphicsPipelineCreateInfo*")
pi.sType = vk.VK_STRUCTURE_TYPE_GRAPHICS_PIPELINE_CREATE_INFO
stages = ffi.new("VkPipelineShaderStageCreateInfo[2]")
main_name = ffi.new("char[]", b"main")
stages[0].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO
stages[0].stage = vk.VK_SHADER_STAGE_VERTEX_BIT
stages[0].module = self._vert_module
stages[0].pName = main_name
stages[1].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO
stages[1].stage = vk.VK_SHADER_STAGE_FRAGMENT_BIT
stages[1].module = self._frag_module
stages[1].pName = main_name
pi.stageCount = 2
pi.pStages = stages
vi = ffi.new("VkPipelineVertexInputStateCreateInfo*")
vi.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VERTEX_INPUT_STATE_CREATE_INFO
pi.pVertexInputState = vi
ia = ffi.new("VkPipelineInputAssemblyStateCreateInfo*")
ia.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_INPUT_ASSEMBLY_STATE_CREATE_INFO
ia.topology = vk.VK_PRIMITIVE_TOPOLOGY_TRIANGLE_LIST
pi.pInputAssemblyState = ia
vps = ffi.new("VkPipelineViewportStateCreateInfo*")
vps.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VIEWPORT_STATE_CREATE_INFO
vps.viewportCount = 1
viewport = ffi.new("VkViewport*")
viewport.width = float(extent[0])
viewport.height = float(extent[1])
viewport.maxDepth = 1.0
vps.pViewports = viewport
scissor = ffi.new("VkRect2D*")
scissor.extent.width = extent[0]
scissor.extent.height = extent[1]
vps.scissorCount = 1
vps.pScissors = scissor
pi.pViewportState = vps
rs = ffi.new("VkPipelineRasterizationStateCreateInfo*")
rs.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_RASTERIZATION_STATE_CREATE_INFO
rs.polygonMode = vk.VK_POLYGON_MODE_FILL
rs.lineWidth = 1.0
rs.cullMode = vk.VK_CULL_MODE_NONE
pi.pRasterizationState = rs
ms = ffi.new("VkPipelineMultisampleStateCreateInfo*")
ms.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_MULTISAMPLE_STATE_CREATE_INFO
ms.rasterizationSamples = vk.VK_SAMPLE_COUNT_1_BIT
pi.pMultisampleState = ms
dss = ffi.new("VkPipelineDepthStencilStateCreateInfo*")
dss.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DEPTH_STENCIL_STATE_CREATE_INFO
dss.depthTestEnable = 0
dss.depthWriteEnable = 0
pi.pDepthStencilState = dss
cba = ffi.new("VkPipelineColorBlendAttachmentState*")
cba.colorWriteMask = (vk.VK_COLOR_COMPONENT_R_BIT | vk.VK_COLOR_COMPONENT_G_BIT
| vk.VK_COLOR_COMPONENT_B_BIT | vk.VK_COLOR_COMPONENT_A_BIT)
cb = ffi.new("VkPipelineColorBlendStateCreateInfo*")
cb.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_COLOR_BLEND_STATE_CREATE_INFO
cb.attachmentCount = 1
cb.pAttachments = cba
pi.pColorBlendState = cb
dyn_states = ffi.new("VkDynamicState[2]", [vk.VK_DYNAMIC_STATE_VIEWPORT, vk.VK_DYNAMIC_STATE_SCISSOR])
ds = ffi.new("VkPipelineDynamicStateCreateInfo*")
ds.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DYNAMIC_STATE_CREATE_INFO
ds.dynamicStateCount = 2
ds.pDynamicStates = dyn_states
pi.pDynamicState = ds
pi.layout = self._pipeline_layout
pi.renderPass = render_pass
pipeline_out = ffi.new("VkPipeline*")
if vk._vulkan._callApi(vk._vulkan.lib.vkCreateGraphicsPipelines, device, ffi.NULL, 1,
pi, ffi.NULL, pipeline_out) != vk.VK_SUCCESS:
raise RuntimeError("vkCreateGraphicsPipelines failed (volumetric fog)")
self._pipeline = pipeline_out[0]
# -- resize / cleanup ----------------------------------------------------
[docs]
def resize(self, width: int, height: int, hdr_view: Any, depth_view: Any) -> None:
if not self._ready:
return
device = self._engine.ctx.device
self._width, self._height = width, height
if self._target:
self._target.destroy()
self._target = RenderTarget(
device, self._engine.ctx.physical_device, width, height,
colour_format=vk.VK_FORMAT_R16G16B16A16_SFLOAT, use_depth=False,
queue=self._engine.ctx.graphics_queue, command_pool=self._engine.ctx.command_pool,
)
self._write_descriptors(hdr_view, depth_view)
[docs]
def cleanup(self) -> None:
if not self._ready:
return
device = self._engine.ctx.device
if self._pipeline:
vk.vkDestroyPipeline(device, self._pipeline, None)
if self._pipeline_layout:
vk.vkDestroyPipelineLayout(device, self._pipeline_layout, None)
if self._vert_module:
vk.vkDestroyShaderModule(device, self._vert_module, None)
if self._frag_module:
vk.vkDestroyShaderModule(device, self._frag_module, None)
if self._desc_pool:
vk.vkDestroyDescriptorPool(device, self._desc_pool, None)
if self._desc_layout:
vk.vkDestroyDescriptorSetLayout(device, self._desc_layout, None)
if self._sampler:
vk.vkDestroySampler(device, self._sampler, None)
if self._depth_sampler:
vk.vkDestroySampler(device, self._depth_sampler, None)
if self._ubo_buf:
vk.vkDestroyBuffer(device, self._ubo_buf, None)
if self._ubo_mem:
vk.vkFreeMemory(device, self._ubo_mem, None)
if self._vol_buf:
vk.vkDestroyBuffer(device, self._vol_buf, None)
if self._vol_mem:
vk.vkFreeMemory(device, self._vol_mem, None)
if self._target:
self._target.destroy()
self._ready = False