Source code for simvx.graphics.renderer.light2d_pass

"""2D light accumulation pass — renders Light2D nodes as additive radial gradients.

Renders each light as a screen-space quad with radial falloff. The accumulated
light texture is provided to Draw2DPass for final compositing (multiply blend).

Shadow casting uses a simple 1D angular shadow map per light: for each angle,
a ray is cast from the light center and the nearest occluder intersection
distance is stored. The fragment shader compares its distance against this
map to determine if it is in shadow.
"""

from __future__ import annotations

import logging
import math
import struct
from typing import Any

import numpy as np
import vulkan as vk

from .._types import SHADER_DIR
from ..gpu.memory import create_image
from ..gpu.pipeline import create_shader_module
from ..materials.shader_compiler import compile_shader
from .passes import create_offscreen_pass

__all__ = ["Light2DPass"]

log = logging.getLogger(__name__)

# Push constant layout must match light2d.vert/frag LightPush struct
# vec2 screen_size (8) + vec2 light_pos (8) + float range (4) + float falloff (4)
# + float energy (4) + float pad (4) + vec4 light_colour (16) = 48 bytes
PUSH_SIZE = 48

# Shadow map resolution (1D angular samples per light)
SHADOW_MAP_RESOLUTION = 64


[docs] class Light2DPass: """GPU pass that renders 2D lights to an offscreen accumulation texture. Usage from the forward renderer: 1. ``begin_frame()`` — clears per-frame submission lists 2. ``submit_light(...)`` / ``submit_occluder(...)`` — queue data 3. ``render(cmd, extent)`` — render all lights to accumulation RT 4. ``get_light_texture_view()`` — returns the image view for compositing """ def __init__(self, engine: Any): self._engine = engine # Pipeline resources self._pipeline: Any = None self._pipeline_layout: Any = None self._vert_module: Any = None self._frag_module: Any = None # Offscreen render target self._rt_image: Any = None self._rt_memory: Any = None self._rt_view: Any = None self._rt_render_pass: Any = None self._rt_framebuffer: Any = None self._rt_sampler: Any = None self._rt_width = 0 self._rt_height = 0 # Per-frame submissions self._lights: list[dict] = [] self._occluders: list[list[tuple[float, float]]] = [] # Shadow map (CPU-computed, uploaded as 1D texture per light) self._shadow_map: np.ndarray = np.ones(SHADOW_MAP_RESOLUTION, dtype=np.float32) self._ready = False
[docs] def setup(self) -> None: """Create GPU resources: shaders, pipeline, render target.""" e = self._engine device = e.ctx.device phys = e.ctx.physical_device w, h = e.extent # Compile light2d shaders vert_spv = compile_shader(SHADER_DIR / "light2d.vert") frag_spv = compile_shader(SHADER_DIR / "light2d.frag") self._vert_module = create_shader_module(device, vert_spv) self._frag_module = create_shader_module(device, frag_spv) # Create offscreen render target (additive accumulation) self._create_render_target(device, phys, w, h) # Create pipeline with additive blending self._pipeline, self._pipeline_layout = _create_light2d_pipeline( device, self._vert_module, self._frag_module, self._rt_render_pass, (w, h), ) # Sampler for reading the accumulation texture sampler_info = vk.VkSamplerCreateInfo( magFilter=vk.VK_FILTER_LINEAR, minFilter=vk.VK_FILTER_LINEAR, addressModeU=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE, addressModeV=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE, addressModeW=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE, anisotropyEnable=vk.VK_FALSE, unnormalizedCoordinates=vk.VK_FALSE, compareEnable=vk.VK_FALSE, mipmapMode=vk.VK_SAMPLER_MIPMAP_MODE_LINEAR, ) self._rt_sampler = vk.vkCreateSampler(device, sampler_info, None) self._ready = True log.debug("Light2DPass setup complete (%dx%d)", w, h)
def _create_render_target(self, device: Any, phys: Any, w: int, h: int) -> None: """Create the offscreen render target for light accumulation.""" self._rt_width = w self._rt_height = h fmt = vk.VK_FORMAT_R16G16B16A16_SFLOAT # HDR accumulation self._rt_image, self._rt_memory = create_image( device, phys, w, h, fmt, vk.VK_IMAGE_USAGE_COLOR_ATTACHMENT_BIT | vk.VK_IMAGE_USAGE_SAMPLED_BIT, ) self._rt_view = vk.vkCreateImageView( device, vk.VkImageViewCreateInfo( image=self._rt_image, viewType=vk.VK_IMAGE_VIEW_TYPE_2D, format=fmt, subresourceRange=vk.VkImageSubresourceRange( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1, ), ), None, ) # Render pass: clear to black, store result as shader-readable self._rt_render_pass = create_offscreen_pass(device, fmt, depth_format=0) self._rt_framebuffer = vk.vkCreateFramebuffer( device, vk.VkFramebufferCreateInfo( renderPass=self._rt_render_pass, attachmentCount=1, pAttachments=[self._rt_view], width=w, height=h, layers=1, ), None, )
[docs] def submit_light( self, position: tuple[float, float], colour: tuple[float, float, float], energy: float, light_range: float, falloff: float = 1.0, blend_mode: str = "add", shadow_enabled: bool = False, shadow_colour: tuple[float, ...] = (0.0, 0.0, 0.0, 0.5), ) -> None: """Queue a light for rendering this frame.""" self._lights.append( { "position": position, "colour": colour, "energy": energy, "range": light_range, "falloff": falloff, "blend_mode": blend_mode, "shadow_enabled": shadow_enabled, "shadow_colour": shadow_colour, } )
[docs] def submit_occluder(self, polygon_vertices: list[tuple[float, float]]) -> None: """Queue an occluder polygon for shadow casting this frame.""" if len(polygon_vertices) >= 2: self._occluders.append(polygon_vertices)
[docs] def begin_frame(self) -> None: """Clear per-frame submission lists.""" self._lights.clear() self._occluders.clear()
[docs] def render(self, cmd: Any, extent: tuple[int, int]) -> None: """Render all queued lights to the accumulation render target. Must be called outside the main render pass (in pre_render phase). """ if not self._ready or not self._lights: return w, h = extent # Resize render target if window changed if w != self._rt_width or h != self._rt_height: self._destroy_render_target() self._create_render_target( self._engine.ctx.device, self._engine.ctx.physical_device, w, h, ) # Recreate pipeline for new extent device = self._engine.ctx.device if self._pipeline: vk.vkDestroyPipeline(device, self._pipeline, None) if self._pipeline_layout: vk.vkDestroyPipelineLayout(device, self._pipeline_layout, None) self._pipeline, self._pipeline_layout = _create_light2d_pipeline( device, self._vert_module, self._frag_module, self._rt_render_pass, (w, h), ) # Begin offscreen render pass (clears to black) clear_value = vk.VkClearValue( color=vk.VkClearColorValue(float32=[0.0, 0.0, 0.0, 0.0]), ) begin_info = vk.VkRenderPassBeginInfo( renderPass=self._rt_render_pass, framebuffer=self._rt_framebuffer, renderArea=vk.VkRect2D( offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=w, height=h), ), clearValueCount=1, pClearValues=[clear_value], ) vk.vkCmdBeginRenderPass(cmd, begin_info, vk.VK_SUBPASS_CONTENTS_INLINE) vk_viewport = vk.VkViewport( x=0.0, y=0.0, width=float(w), height=float(h), minDepth=0.0, maxDepth=1.0, ) scissor = vk.VkRect2D( offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=w, height=h), ) vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline) vk.vkCmdSetViewport(cmd, 0, 1, [vk_viewport]) vk.vkCmdSetScissor(cmd, 0, 1, [scissor]) # Render each light as a fullscreen quad with push constants for light in self._lights: shadow_mult = 1.0 if light["shadow_enabled"] and self._occluders: shadow_mult = self._compute_shadow_multiplier(light) px, py = light["position"] r, g, b = light["colour"] energy = light["energy"] * shadow_mult lr = light["range"] falloff = light.get("falloff", 1.0) # Pack push constants: must match LightPush struct layout push_data = struct.pack( "ff ff ffff ffff", float(w), float(h), # screen_size px, py, # light_pos lr, falloff, energy, 0.0, # range, falloff, energy, pad r, g, b, 1.0, # light_colour ) self._engine.push_constants(cmd, self._pipeline_layout, push_data) vk.vkCmdDraw(cmd, 6, 1, 0, 0) # 6 verts = fullscreen quad vk.vkCmdEndRenderPass(cmd)
def _compute_shadow_multiplier(self, light: dict) -> float: """Vectorised CPU shadow estimation using numpy. Casts SHADOW_MAP_RESOLUTION rays from the light centre and tests each against all nearby occluder edges in a single batched operation. """ lx, ly = light["position"] lr = light["range"] lr_sq = lr * lr # Pre-filter: collect edges from occluders whose AABB overlaps the light circle edges_a: list[tuple[float, float]] = [] edges_e: list[tuple[float, float]] = [] # edge vectors (bx-ax, by-ay) for polygon in self._occluders: xs = [p[0] for p in polygon] ys = [p[1] for p in polygon] mn_x, mx_x, mn_y, mx_y = min(xs), max(xs), min(ys), max(ys) cx = max(mn_x, min(lx, mx_x)) cy = max(mn_y, min(ly, mx_y)) if (cx - lx) ** 2 + (cy - ly) ** 2 > lr_sq: continue n = len(polygon) for i in range(n): ax, ay = polygon[i] bx, by = polygon[(i + 1) % n] edges_a.append((ax, ay)) edges_e.append((bx - ax, by - ay)) if not edges_a: return 1.0 # Build numpy arrays: edges (E,2) and ray directions (R,2) ea = np.array(edges_a, dtype=np.float32) # (E, 2) edge start points ee = np.array(edges_e, dtype=np.float32) # (E, 2) edge vectors n_rays = SHADOW_MAP_RESOLUTION angles = np.linspace(0, math.tau, n_rays, endpoint=False, dtype=np.float32) dirs = np.column_stack((np.cos(angles), np.sin(angles))) # (R, 2) # Vectorised ray-segment intersection (all rays × all edges) # For ray (lx,ly)+t*(dx,dy) and segment ea+u*ee: # t = ((ea - origin) × ee) / (dir × ee) # u = ((ea - origin) × dir) / (dir × ee) # where a × b = ax*by - ay*bx (2D cross product) origin = np.array([lx, ly], dtype=np.float32) rel = ea - origin # (E, 2) # Cross products: dir × ee for each (ray, edge) pair # dirs[:,0]*ee[:,1] - dirs[:,1]*ee[:,0] → (R, E) denom = dirs[:, 0:1] * ee[:, 1] - dirs[:, 1:2] * ee[:, 0] # (R, E) # rel × ee → (E,) broadcast to (R, E) via rel doesn't depend on ray rel_cross_ee = rel[:, 0] * ee[:, 1] - rel[:, 1] * ee[:, 0] # (E,) # rel × dir → (R, E) rel_cross_dir = rel[:, 0] * dirs[:, 1:2] - rel[:, 1] * dirs[:, 0:1] # (R, E) # Avoid division by zero (use safe_divide to suppress warnings) valid = np.abs(denom) > 1e-10 safe_denom = np.where(valid, denom, 1.0) t = np.where(valid, rel_cross_ee / safe_denom, -1.0) u = np.where(valid, rel_cross_dir / safe_denom, -1.0) # Hit if 0 <= t <= lr and 0 <= u <= 1 hits = valid & (t >= 0) & (t <= lr) & (u >= 0) & (u <= 1) # (R, E) occluded = np.any(hits, axis=1) # (R,) ratio = float(np.count_nonzero(occluded)) / n_rays return 1.0 - ratio * 0.5
[docs] def get_light_texture_view(self) -> Any: """Return the light accumulation image view for compositing.""" return self._rt_view
[docs] def get_light_sampler(self) -> Any: """Return the sampler for the light accumulation texture.""" return self._rt_sampler
@property def has_lights(self) -> bool: """True if any lights were submitted this frame.""" return len(self._lights) > 0 def _destroy_render_target(self) -> None: """Destroy offscreen RT resources (for resize).""" device = self._engine.ctx.device if self._rt_framebuffer: vk.vkDestroyFramebuffer(device, self._rt_framebuffer, None) if self._rt_render_pass: vk.vkDestroyRenderPass(device, self._rt_render_pass, None) if self._rt_view: vk.vkDestroyImageView(device, self._rt_view, None) if self._rt_image: vk.vkDestroyImage(device, self._rt_image, None) if self._rt_memory: vk.vkFreeMemory(device, self._rt_memory, None)
[docs] def cleanup(self) -> None: """Destroy all GPU resources.""" if not self._ready: return device = self._engine.ctx.device self._destroy_render_target() for obj, fn in [ (self._pipeline, vk.vkDestroyPipeline), (self._pipeline_layout, vk.vkDestroyPipelineLayout), (self._vert_module, vk.vkDestroyShaderModule), (self._frag_module, vk.vkDestroyShaderModule), (self._rt_sampler, vk.vkDestroySampler), ]: if obj: fn(device, obj, None) self._ready = False
def _ray_segment_intersect( ox: float, oy: float, dx: float, dy: float, ax: float, ay: float, bx: float, by: float, max_dist: float, ) -> bool: """Test if ray (ox,oy)->(dx,dy) intersects segment (ax,ay)-(bx,by) within max_dist.""" ex, ey = bx - ax, by - ay denom = dx * ey - dy * ex if abs(denom) < 1e-10: return False t = ((ax - ox) * ey - (ay - oy) * ex) / denom u = ((ax - ox) * dy - (ay - oy) * dx) / denom return 0.0 <= t <= max_dist and 0.0 <= u <= 1.0 def _create_light2d_pipeline( device: Any, vert_module: Any, frag_module: Any, render_pass: Any, extent: tuple[int, int], ) -> tuple[Any, Any]: """Create additive-blend pipeline for 2D light rendering (no vertex buffer).""" ffi = vk.ffi # Push constant range: 48 bytes (LightPush struct) push_range = ffi.new("VkPushConstantRange*") push_range.stageFlags = vk.VK_SHADER_STAGE_VERTEX_BIT | vk.VK_SHADER_STAGE_FRAGMENT_BIT push_range.offset = 0 push_range.size = PUSH_SIZE layout_ci = ffi.new("VkPipelineLayoutCreateInfo*") layout_ci.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_LAYOUT_CREATE_INFO layout_ci.pushConstantRangeCount = 1 layout_ci.pPushConstantRanges = push_range layout_out = ffi.new("VkPipelineLayout*") result = vk._vulkan._callApi( vk._vulkan.lib.vkCreatePipelineLayout, device, layout_ci, ffi.NULL, layout_out, ) if result != vk.VK_SUCCESS: raise RuntimeError(f"vkCreatePipelineLayout failed: {result}") pipeline_layout = layout_out[0] pi = ffi.new("VkGraphicsPipelineCreateInfo*") pi.sType = vk.VK_STRUCTURE_TYPE_GRAPHICS_PIPELINE_CREATE_INFO # Shader stages stages = ffi.new("VkPipelineShaderStageCreateInfo[2]") main_name = ffi.new("char[]", b"main") stages[0].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO stages[0].stage = vk.VK_SHADER_STAGE_VERTEX_BIT stages[0].module = vert_module stages[0].pName = main_name stages[1].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO stages[1].stage = vk.VK_SHADER_STAGE_FRAGMENT_BIT stages[1].module = frag_module stages[1].pName = main_name pi.stageCount = 2 pi.pStages = stages # No vertex input (positions generated in vertex shader from gl_VertexIndex) vi = ffi.new("VkPipelineVertexInputStateCreateInfo*") vi.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VERTEX_INPUT_STATE_CREATE_INFO pi.pVertexInputState = vi # Input assembly — TRIANGLE_LIST (6 verts per quad) ia = ffi.new("VkPipelineInputAssemblyStateCreateInfo*") ia.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_INPUT_ASSEMBLY_STATE_CREATE_INFO ia.topology = vk.VK_PRIMITIVE_TOPOLOGY_TRIANGLE_LIST pi.pInputAssemblyState = ia # Viewport state vps = ffi.new("VkPipelineViewportStateCreateInfo*") vps.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VIEWPORT_STATE_CREATE_INFO vps.viewportCount = 1 viewport = ffi.new("VkViewport*") viewport.width = float(extent[0]) viewport.height = float(extent[1]) viewport.maxDepth = 1.0 vps.pViewports = viewport scissor = ffi.new("VkRect2D*") scissor.extent.width = extent[0] scissor.extent.height = extent[1] vps.scissorCount = 1 vps.pScissors = scissor pi.pViewportState = vps # Rasterization rs = ffi.new("VkPipelineRasterizationStateCreateInfo*") rs.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_RASTERIZATION_STATE_CREATE_INFO rs.polygonMode = vk.VK_POLYGON_MODE_FILL rs.lineWidth = 1.0 rs.cullMode = vk.VK_CULL_MODE_NONE pi.pRasterizationState = rs # Multisample ms = ffi.new("VkPipelineMultisampleStateCreateInfo*") ms.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_MULTISAMPLE_STATE_CREATE_INFO ms.rasterizationSamples = vk.VK_SAMPLE_COUNT_1_BIT pi.pMultisampleState = ms # No depth test dss = ffi.new("VkPipelineDepthStencilStateCreateInfo*") dss.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DEPTH_STENCIL_STATE_CREATE_INFO dss.depthTestEnable = 0 dss.depthWriteEnable = 0 pi.pDepthStencilState = dss # Additive blending: src + dst cba = ffi.new("VkPipelineColorBlendAttachmentState*") cba.blendEnable = 1 cba.srcColorBlendFactor = vk.VK_BLEND_FACTOR_ONE cba.dstColorBlendFactor = vk.VK_BLEND_FACTOR_ONE cba.colorBlendOp = vk.VK_BLEND_OP_ADD cba.srcAlphaBlendFactor = vk.VK_BLEND_FACTOR_ONE cba.dstAlphaBlendFactor = vk.VK_BLEND_FACTOR_ONE cba.alphaBlendOp = vk.VK_BLEND_OP_ADD cba.colorWriteMask = ( vk.VK_COLOR_COMPONENT_R_BIT | vk.VK_COLOR_COMPONENT_G_BIT | vk.VK_COLOR_COMPONENT_B_BIT | vk.VK_COLOR_COMPONENT_A_BIT ) cb = ffi.new("VkPipelineColorBlendStateCreateInfo*") cb.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_COLOR_BLEND_STATE_CREATE_INFO cb.attachmentCount = 1 cb.pAttachments = cba pi.pColorBlendState = cb # Dynamic state dyn_states = ffi.new( "VkDynamicState[2]", [ vk.VK_DYNAMIC_STATE_VIEWPORT, vk.VK_DYNAMIC_STATE_SCISSOR, ], ) ds = ffi.new("VkPipelineDynamicStateCreateInfo*") ds.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DYNAMIC_STATE_CREATE_INFO ds.dynamicStateCount = 2 ds.pDynamicStates = dyn_states pi.pDynamicState = ds pi.layout = pipeline_layout pi.renderPass = render_pass pipeline_out = ffi.new("VkPipeline*") result = vk._vulkan._callApi( vk._vulkan.lib.vkCreateGraphicsPipelines, device, ffi.NULL, 1, pi, ffi.NULL, pipeline_out, ) if result != vk.VK_SUCCESS: raise RuntimeError(f"vkCreateGraphicsPipelines failed: {result}") log.debug("Light2D pipeline created (additive blend)") return pipeline_out[0], pipeline_layout