"""Custom user-defined post-processing passes — fullscreen shader effects.
Each PostProcessEffect from core is compiled into a Vulkan pipeline that reads
the current framebuffer and writes to a ping-pong render target. Effects run
after built-in post-processing (bloom, SSAO) but before tone mapping, operating
in HDR space (R16G16B16A16_SFLOAT).
"""
from __future__ import annotations
import logging
import struct
import tempfile
import time
from pathlib import Path
from typing import Any
import numpy as np
import vulkan as vk
from simvx.core.world_environment import PostProcessEffect
from ..gpu.memory import create_buffer, create_sampler
from ..gpu.pipeline import create_shader_module
from ..materials.shader_compiler import compile_shader
from .render_target import RenderTarget
__all__ = ["CustomPostProcessPass"]
log = logging.getLogger(__name__)
# Fragment shader wrapper — user code is inserted into main().
# The vertex shader is shared with the tone-mapping pass (tonemap.vert)
# which outputs ``frag_uv`` at location 0.
_FRAG_TEMPLATE = """\
#version 450
layout(set = 0, binding = 0) uniform sampler2D u_colour_tex;
layout(set = 0, binding = 1) uniform sampler2D u_depth_tex;
layout(push_constant) uniform PushConstants {{
vec2 u_resolution;
float u_time;
float _pad0;
}} pc;
// Convenience aliases so user code can reference these as globals
#define u_resolution pc.u_resolution
#define u_time pc.u_time
layout(location = 0) in vec2 frag_uv;
layout(location = 0) out vec4 frag_color;
// Legacy alias — user code may reference v_uv
#define v_uv frag_uv
{user_uniforms}
{user_code}
"""
_PC_SIZE = 16 # vec2(8) + float(4) + pad(4)
def _build_uniform_block(effect: PostProcessEffect) -> str:
"""Generate a GLSL uniform block declaration from effect uniforms."""
if not effect._uniforms:
return ""
lines = ["layout(set = 0, binding = 2) uniform UserUniforms {"]
for name in sorted(effect._uniforms):
utype = effect._uniform_types.get(name, "float")
lines.append(f" {utype} {name};")
lines.append("} u_user;")
# Also emit #defines so user code can reference uniforms without the u_user. prefix
for name in sorted(effect._uniforms):
lines.append(f"#define {name} u_user.{name}")
return "\n".join(lines)
def _wrap_fragment_source(effect: PostProcessEffect) -> str:
"""Wrap user fragment code with the standard preamble."""
uniform_block = _build_uniform_block(effect)
return _FRAG_TEMPLATE.format(user_uniforms=uniform_block, user_code=effect.shader_code)
def _compile_glsl_source(source: str, stage_ext: str) -> Path:
"""Write GLSL to a temp file and compile to SPIR-V."""
tmp = Path(tempfile.mktemp(suffix=stage_ext))
tmp.write_text(source)
try:
return compile_shader(tmp)
finally:
tmp.unlink(missing_ok=True)
def _std140_alignment(utype: str) -> int:
if utype in ("float", "int", "uint"):
return 4
if utype in ("vec2", "ivec2"):
return 8
return 16 # vec3, vec4, mat4
def _pack_uniform_value(value: Any, utype: str) -> bytes:
"""Pack a single uniform value to bytes."""
if utype == "float":
return struct.pack("f", float(value))
if utype == "int":
return struct.pack("i", int(value))
if utype == "uint":
return struct.pack("I", int(value))
if utype == "vec2":
v = value if isinstance(value, (tuple, list)) else list(value)
return struct.pack("2f", *v[:2])
if utype == "vec3":
v = value if isinstance(value, (tuple, list)) else list(value)
return struct.pack("3f", *v[:3])
if utype == "vec4":
v = value if isinstance(value, (tuple, list)) else list(value)
return struct.pack("4f", *v[:4])
if utype == "mat4":
arr = np.asarray(value, dtype=np.float32).ravel()
return arr.tobytes()[:64]
return struct.pack("f", float(value))
class _EffectState:
"""GPU resources for a single compiled PostProcessEffect."""
__slots__ = (
"effect", "frag_module", "pipeline", "pipeline_layout",
"desc_pool", "desc_layout", "desc_set",
"ubo_buf", "ubo_mem", "ubo_size",
"shader_hash",
)
def __init__(self) -> None:
self.effect: PostProcessEffect | None = None
self.frag_module: Any = None
self.pipeline: Any = None
self.pipeline_layout: Any = None
self.desc_pool: Any = None
self.desc_layout: Any = None
self.desc_set: Any = None
self.ubo_buf: Any = None
self.ubo_mem: Any = None
self.ubo_size: int = 0
self.shader_hash: int = 0
def cleanup(self, device: Any) -> None:
if self.pipeline:
vk.vkDestroyPipeline(device, self.pipeline, None)
if self.pipeline_layout:
vk.vkDestroyPipelineLayout(device, self.pipeline_layout, None)
if self.frag_module:
vk.vkDestroyShaderModule(device, self.frag_module, None)
if self.desc_pool:
vk.vkDestroyDescriptorPool(device, self.desc_pool, None)
if self.desc_layout:
vk.vkDestroyDescriptorSetLayout(device, self.desc_layout, None)
if self.ubo_buf:
vk.vkDestroyBuffer(device, self.ubo_buf, None)
if self.ubo_mem:
vk.vkFreeMemory(device, self.ubo_mem, None)
[docs]
class CustomPostProcessPass:
"""Manages custom user post-process effects as fullscreen Vulkan passes.
Ping-pongs between two render targets so multiple effects can chain.
Integrates with the existing PostProcessPass pipeline.
"""
def __init__(self, engine: Any):
self._engine = engine
self._start_time = time.perf_counter()
# Shared vertex shader module (fullscreen triangle)
self._vert_module: Any = None
# Shared sampler for colour/depth textures
self._sampler: Any = None
self._depth_sampler: Any = None
# Ping-pong render targets (same format as swapchain for LDR output)
self._rt_a: RenderTarget | None = None
self._rt_b: RenderTarget | None = None
# Per-effect GPU state
self._states: list[_EffectState] = []
self._ready = False
[docs]
def setup(self) -> None:
"""Create shared GPU resources."""
e = self._engine
device = e.ctx.device
w, h = e.extent
# Re-use the tonemap fullscreen triangle vertex shader (same output varying)
vert_spv = compile_shader(e.shader_dir / "tonemap.vert")
self._vert_module = create_shader_module(device, vert_spv)
# Samplers
self._sampler = create_sampler(device)
self._depth_sampler = vk.vkCreateSampler(device, vk.VkSamplerCreateInfo(
magFilter=vk.VK_FILTER_LINEAR,
minFilter=vk.VK_FILTER_LINEAR,
mipmapMode=vk.VK_SAMPLER_MIPMAP_MODE_NEAREST,
addressModeU=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
addressModeV=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
addressModeW=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
minLod=0.0, maxLod=0.0,
), None)
# Ping-pong targets — HDR (R16G16B16A16_SFLOAT) so effects run before tone mapping
hdr_fmt = vk.VK_FORMAT_R16G16B16A16_SFLOAT
self._rt_a = RenderTarget(device, e.ctx.physical_device, w, h, color_format=hdr_fmt, use_depth=False)
self._rt_b = RenderTarget(device, e.ctx.physical_device, w, h, color_format=hdr_fmt, use_depth=False)
self._ready = True
log.debug("CustomPostProcessPass initialised (%dx%d)", w, h)
[docs]
def sync_effects(self, effects: list[PostProcessEffect]) -> None:
"""Synchronise GPU state with the current list of PostProcessEffects.
Creates/destroys pipelines as effects are added/removed/changed.
"""
if not self._ready:
return
device = self._engine.ctx.device
# Build set of current effect ids
current_ids = {id(e) for e in effects}
# Remove states for effects that no longer exist
kept: list[_EffectState] = []
for st in self._states:
if id(st.effect) not in current_ids:
st.cleanup(device)
else:
kept.append(st)
self._states = kept
# Ensure each effect has a state
existing = {id(st.effect): st for st in self._states}
for effect in effects:
eid = id(effect)
if eid in existing:
st = existing[eid]
# Recompile if shader changed
code_hash = hash(effect.shader_code)
if code_hash != st.shader_hash:
st.cleanup(device)
self._states.remove(st)
self._build_effect_state(effect)
else:
# Update UBO if uniforms changed
self._update_ubo(st)
else:
self._build_effect_state(effect)
# Sort states by effect order
self._states.sort(key=lambda s: s.effect.order if s.effect else 0)
def _build_effect_state(self, effect: PostProcessEffect) -> None:
"""Compile and create all GPU resources for a single effect."""
e = self._engine
device = e.ctx.device
st = _EffectState()
st.effect = effect
st.shader_hash = hash(effect.shader_code)
# Compile fragment shader
frag_source = _wrap_fragment_source(effect)
try:
frag_spv = _compile_glsl_source(frag_source, ".frag")
except Exception:
log.exception("Failed to compile custom post-process shader")
return
st.frag_module = create_shader_module(device, frag_spv)
# Descriptor layout: binding 0 = colour, 1 = depth, (optional) 2 = UBO
has_uniforms = bool(effect._uniforms)
bindings = [
vk.VkDescriptorSetLayoutBinding(
binding=i, descriptorType=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER,
descriptorCount=1, stageFlags=vk.VK_SHADER_STAGE_FRAGMENT_BIT,
)
for i in range(2)
]
if has_uniforms:
bindings.append(vk.VkDescriptorSetLayoutBinding(
binding=2, descriptorType=vk.VK_DESCRIPTOR_TYPE_UNIFORM_BUFFER,
descriptorCount=1, stageFlags=vk.VK_SHADER_STAGE_FRAGMENT_BIT,
))
st.desc_layout = vk.vkCreateDescriptorSetLayout(device,
vk.VkDescriptorSetLayoutCreateInfo(bindingCount=len(bindings), pBindings=bindings), None)
# Descriptor pool
pool_sizes = [vk.VkDescriptorPoolSize(
type=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER, descriptorCount=2,
)]
if has_uniforms:
pool_sizes.append(vk.VkDescriptorPoolSize(
type=vk.VK_DESCRIPTOR_TYPE_UNIFORM_BUFFER, descriptorCount=1,
))
st.desc_pool = vk.vkCreateDescriptorPool(device,
vk.VkDescriptorPoolCreateInfo(maxSets=1, poolSizeCount=len(pool_sizes), pPoolSizes=pool_sizes), None)
# Allocate descriptor set
sets = vk.vkAllocateDescriptorSets(device, vk.VkDescriptorSetAllocateInfo(
descriptorPool=st.desc_pool, descriptorSetCount=1, pSetLayouts=[st.desc_layout],
))
st.desc_set = sets[0]
# Create UBO if needed
if has_uniforms:
ubo_size = self._calc_ubo_size(effect)
st.ubo_size = max(ubo_size, 16) # Minimum 16 bytes
st.ubo_buf, st.ubo_mem = create_buffer(
device, e.ctx.physical_device, st.ubo_size,
vk.VK_BUFFER_USAGE_UNIFORM_BUFFER_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
self._update_ubo(st)
# Create pipeline
st.pipeline, st.pipeline_layout = self._create_pipeline(device, st)
effect.clear_dirty()
self._states.append(st)
log.debug("Built custom post-process pipeline (order=%d, uniforms=%d)", effect.order, len(effect._uniforms))
def _calc_ubo_size(self, effect: PostProcessEffect) -> int:
"""Calculate std140 UBO size for an effect's uniforms."""
offset = 0
for name in sorted(effect._uniforms):
utype = effect._uniform_types.get(name, "float")
alignment = _std140_alignment(utype)
offset = (offset + alignment - 1) & ~(alignment - 1)
_, size = {
"float": ("f", 4), "int": ("i", 4), "uint": ("I", 4),
"vec2": ("2f", 8), "vec3": ("3f", 12), "vec4": ("4f", 16), "mat4": ("16f", 64),
}.get(utype, ("f", 4))
offset += size
return offset
def _update_ubo(self, st: _EffectState) -> None:
"""Upload uniform values to the effect's UBO."""
if not st.ubo_buf or not st.effect or not st.effect._uniforms:
return
data = bytearray()
offset = 0
for name in sorted(st.effect._uniforms):
value = st.effect._uniforms[name]
utype = st.effect._uniform_types.get(name, "float")
alignment = _std140_alignment(utype)
aligned = (offset + alignment - 1) & ~(alignment - 1)
data.extend(b"\x00" * (aligned - offset))
offset = aligned
packed = _pack_uniform_value(value, utype)
data.extend(packed)
offset += len(packed)
if not data or offset > st.ubo_size:
return
buf = bytes(data)
device = self._engine.ctx.device
src = vk.ffi.from_buffer(buf)
dst = vk.vkMapMemory(device, st.ubo_mem, 0, len(buf), 0)
vk.ffi.memmove(dst, src, len(buf))
vk.vkUnmapMemory(device, st.ubo_mem)
def _write_descriptors(self, st: _EffectState, color_view: Any, depth_view: Any) -> None:
"""Write colour/depth/UBO descriptors for an effect."""
device = self._engine.ctx.device
color_info = vk.VkDescriptorImageInfo(
sampler=self._sampler, imageView=color_view,
imageLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL,
)
depth_info = vk.VkDescriptorImageInfo(
sampler=self._depth_sampler, imageView=depth_view,
imageLayout=vk.VK_IMAGE_LAYOUT_DEPTH_STENCIL_READ_ONLY_OPTIMAL,
)
writes = [
vk.VkWriteDescriptorSet(
dstSet=st.desc_set, dstBinding=0, dstArrayElement=0,
descriptorCount=1, descriptorType=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER,
pImageInfo=[color_info],
),
vk.VkWriteDescriptorSet(
dstSet=st.desc_set, dstBinding=1, dstArrayElement=0,
descriptorCount=1, descriptorType=vk.VK_DESCRIPTOR_TYPE_COMBINED_IMAGE_SAMPLER,
pImageInfo=[depth_info],
),
]
if st.ubo_buf:
ubo_info = vk.VkDescriptorBufferInfo(buffer=st.ubo_buf, offset=0, range=st.ubo_size)
writes.append(vk.VkWriteDescriptorSet(
dstSet=st.desc_set, dstBinding=2, dstArrayElement=0,
descriptorCount=1, descriptorType=vk.VK_DESCRIPTOR_TYPE_UNIFORM_BUFFER,
pBufferInfo=[ubo_info],
))
vk.vkUpdateDescriptorSets(device, len(writes), writes, 0, None)
def _create_pipeline(self, device: Any, st: _EffectState) -> tuple[Any, Any]:
"""Create a fullscreen graphics pipeline for this effect."""
ffi = vk.ffi
e = self._engine
# Push constants
push_range = ffi.new("VkPushConstantRange*")
push_range.stageFlags = vk.VK_SHADER_STAGE_FRAGMENT_BIT
push_range.offset = 0
push_range.size = _PC_SIZE
# Pipeline layout
set_layouts = ffi.new("VkDescriptorSetLayout[1]", [st.desc_layout])
layout_ci = ffi.new("VkPipelineLayoutCreateInfo*")
layout_ci.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_LAYOUT_CREATE_INFO
layout_ci.setLayoutCount = 1
layout_ci.pSetLayouts = set_layouts
layout_ci.pushConstantRangeCount = 1
layout_ci.pPushConstantRanges = push_range
layout_out = ffi.new("VkPipelineLayout*")
result = vk._vulkan._callApi(
vk._vulkan.lib.vkCreatePipelineLayout, device, layout_ci, ffi.NULL, layout_out)
if result != vk.VK_SUCCESS:
raise RuntimeError(f"vkCreatePipelineLayout failed: {result}")
pipeline_layout = layout_out[0]
# Use the ping-pong render target's render pass
render_pass = self._rt_a.render_pass if self._rt_a else e.render_pass
w, h = e.extent
# Pipeline create info
pi = ffi.new("VkGraphicsPipelineCreateInfo*")
pi.sType = vk.VK_STRUCTURE_TYPE_GRAPHICS_PIPELINE_CREATE_INFO
stages = ffi.new("VkPipelineShaderStageCreateInfo[2]")
main_name = ffi.new("char[]", b"main")
stages[0].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO
stages[0].stage = vk.VK_SHADER_STAGE_VERTEX_BIT
stages[0].module = self._vert_module
stages[0].pName = main_name
stages[1].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO
stages[1].stage = vk.VK_SHADER_STAGE_FRAGMENT_BIT
stages[1].module = st.frag_module
stages[1].pName = main_name
pi.stageCount = 2
pi.pStages = stages
vi = ffi.new("VkPipelineVertexInputStateCreateInfo*")
vi.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VERTEX_INPUT_STATE_CREATE_INFO
pi.pVertexInputState = vi
ia = ffi.new("VkPipelineInputAssemblyStateCreateInfo*")
ia.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_INPUT_ASSEMBLY_STATE_CREATE_INFO
ia.topology = vk.VK_PRIMITIVE_TOPOLOGY_TRIANGLE_LIST
pi.pInputAssemblyState = ia
vps = ffi.new("VkPipelineViewportStateCreateInfo*")
vps.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VIEWPORT_STATE_CREATE_INFO
vps.viewportCount = 1
viewport = ffi.new("VkViewport*")
viewport.width = float(w)
viewport.height = float(h)
viewport.maxDepth = 1.0
vps.pViewports = viewport
scissor = ffi.new("VkRect2D*")
scissor.extent.width = w
scissor.extent.height = h
vps.scissorCount = 1
vps.pScissors = scissor
pi.pViewportState = vps
rs = ffi.new("VkPipelineRasterizationStateCreateInfo*")
rs.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_RASTERIZATION_STATE_CREATE_INFO
rs.polygonMode = vk.VK_POLYGON_MODE_FILL
rs.lineWidth = 1.0
rs.cullMode = vk.VK_CULL_MODE_NONE
pi.pRasterizationState = rs
ms = ffi.new("VkPipelineMultisampleStateCreateInfo*")
ms.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_MULTISAMPLE_STATE_CREATE_INFO
ms.rasterizationSamples = vk.VK_SAMPLE_COUNT_1_BIT
pi.pMultisampleState = ms
dss = ffi.new("VkPipelineDepthStencilStateCreateInfo*")
dss.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DEPTH_STENCIL_STATE_CREATE_INFO
dss.depthTestEnable = 0
dss.depthWriteEnable = 0
pi.pDepthStencilState = dss
cba = ffi.new("VkPipelineColorBlendAttachmentState*")
cba.colorWriteMask = (
vk.VK_COLOR_COMPONENT_R_BIT | vk.VK_COLOR_COMPONENT_G_BIT
| vk.VK_COLOR_COMPONENT_B_BIT | vk.VK_COLOR_COMPONENT_A_BIT
)
cb = ffi.new("VkPipelineColorBlendStateCreateInfo*")
cb.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_COLOR_BLEND_STATE_CREATE_INFO
cb.attachmentCount = 1
cb.pAttachments = cba
pi.pColorBlendState = cb
dyn_states = ffi.new("VkDynamicState[2]", [vk.VK_DYNAMIC_STATE_VIEWPORT, vk.VK_DYNAMIC_STATE_SCISSOR])
ds = ffi.new("VkPipelineDynamicStateCreateInfo*")
ds.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DYNAMIC_STATE_CREATE_INFO
ds.dynamicStateCount = 2
ds.pDynamicStates = dyn_states
pi.pDynamicState = ds
pi.layout = pipeline_layout
pi.renderPass = render_pass
pipeline_out = ffi.new("VkPipeline*")
result = vk._vulkan._callApi(
vk._vulkan.lib.vkCreateGraphicsPipelines, device, ffi.NULL, 1, pi, ffi.NULL, pipeline_out)
if result != vk.VK_SUCCESS:
raise RuntimeError(f"vkCreateGraphicsPipelines failed: {result}")
return pipeline_out[0], pipeline_layout
[docs]
def render(self, cmd: Any, input_color_view: Any, depth_view: Any,
width: int, height: int) -> Any:
"""Execute all enabled custom effects, ping-ponging between targets.
Args:
cmd: Vulkan command buffer (must be outside any render pass).
input_color_view: Image view of the current frame (post-tonemap).
depth_view: Depth buffer image view.
width: Framebuffer width.
height: Framebuffer height.
Returns:
The image view of the final result (for blitting to swapchain), or
None if no effects ran.
"""
if not self._ready or not self._states:
return None
enabled = [s for s in self._states if s.effect and s.effect.enabled]
if not enabled:
return None
elapsed = time.perf_counter() - self._start_time
current_view = input_color_view
targets = [self._rt_a, self._rt_b]
target_idx = 0
for st in enabled:
target = targets[target_idx]
if not target:
continue
# Update descriptors to point at current input
self._write_descriptors(st, current_view, depth_view)
# Update uniforms
if st.effect.dirty:
self._update_ubo(st)
st.effect.clear_dirty()
# Begin render pass on target
clear = vk.VkClearValue(color=vk.VkClearColorValue(float32=[0, 0, 0, 1]))
rp_begin = vk.VkRenderPassBeginInfo(
renderPass=target.render_pass, framebuffer=target.framebuffer,
renderArea=vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=target.width, height=target.height),
),
clearValueCount=1, pClearValues=[clear],
)
vk.vkCmdBeginRenderPass(cmd, rp_begin, vk.VK_SUBPASS_CONTENTS_INLINE)
# Set dynamic viewport/scissor
vk_viewport = vk.VkViewport(
x=0, y=0, width=float(width), height=float(height), minDepth=0, maxDepth=1,
)
vk.vkCmdSetViewport(cmd, 0, 1, [vk_viewport])
scissor = vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=width, height=height),
)
vk.vkCmdSetScissor(cmd, 0, 1, [scissor])
# Bind pipeline and descriptors
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, st.pipeline)
vk.vkCmdBindDescriptorSets(
cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, st.pipeline_layout,
0, 1, [st.desc_set], 0, None,
)
# Push constants: resolution + time
pc_data = bytearray(_PC_SIZE)
pc_data[0:8] = np.array([float(width), float(height)], dtype=np.float32).tobytes()
pc_data[8:12] = np.array([elapsed], dtype=np.float32).tobytes()
ffi = vk.ffi
cbuf = ffi.new("char[]", bytes(pc_data))
vk._vulkan.lib.vkCmdPushConstants(
cmd, st.pipeline_layout, vk.VK_SHADER_STAGE_FRAGMENT_BIT, 0, _PC_SIZE, cbuf)
# Draw fullscreen triangle
vk.vkCmdDraw(cmd, 3, 1, 0, 0)
vk.vkCmdEndRenderPass(cmd)
# Next effect reads from this target's output
current_view = target.color_view
target_idx = 1 - target_idx
return current_view
@property
def has_effects(self) -> bool:
return any(s.effect and s.effect.enabled for s in self._states)
[docs]
def resize(self, width: int, height: int) -> None:
"""Recreate ping-pong targets and all pipelines for new dimensions."""
if not self._ready:
return
device = self._engine.ctx.device
# Destroy and recreate render targets
if self._rt_a:
self._rt_a.destroy()
if self._rt_b:
self._rt_b.destroy()
hdr_fmt = vk.VK_FORMAT_R16G16B16A16_SFLOAT
self._rt_a = RenderTarget(device, self._engine.ctx.physical_device, width, height,
color_format=hdr_fmt, use_depth=False)
self._rt_b = RenderTarget(device, self._engine.ctx.physical_device, width, height,
color_format=hdr_fmt, use_depth=False)
# Recreate all pipelines (render pass changed)
for st in self._states:
if st.pipeline:
vk.vkDestroyPipeline(device, st.pipeline, None)
if st.pipeline_layout:
vk.vkDestroyPipelineLayout(device, st.pipeline_layout, None)
st.pipeline, st.pipeline_layout = self._create_pipeline(device, st)
[docs]
def cleanup(self) -> None:
"""Release all GPU resources."""
if not self._ready:
return
device = self._engine.ctx.device
for st in self._states:
st.cleanup(device)
self._states.clear()
if self._vert_module:
vk.vkDestroyShaderModule(device, self._vert_module, None)
if self._sampler:
vk.vkDestroySampler(device, self._sampler, None)
if self._depth_sampler:
vk.vkDestroySampler(device, self._depth_sampler, None)
if self._rt_a:
self._rt_a.destroy()
if self._rt_b:
self._rt_b.destroy()
self._ready = False