"""2D light accumulation pass — renders Light2D nodes as additive radial gradients.
Renders each light as a screen-space quad with radial falloff. The accumulated
light texture is provided to Draw2DPass for final compositing (multiply blend).
Shadow casting uses a simple 1D angular shadow map per light: for each angle,
a ray is cast from the light center and the nearest occluder intersection
distance is stored. The fragment shader compares its distance against this
map to determine if it is in shadow.
"""
from __future__ import annotations
import logging
import math
import struct
from typing import Any
import numpy as np
import vulkan as vk
from .._types import SHADER_DIR
from ..gpu.memory import create_image
from ..gpu.pipeline import create_shader_module
from ..materials.shader_compiler import compile_shader
from .passes import create_offscreen_pass
__all__ = ["Light2DPass"]
log = logging.getLogger(__name__)
# Push constant layout must match light2d.vert/frag LightPush struct
# vec2 screen_size (8) + vec2 light_pos (8) + float range (4) + float falloff (4)
# + float energy (4) + float pad (4) + vec4 light_colour (16) = 48 bytes
PUSH_SIZE = 48
# Shadow map resolution (1D angular samples per light)
SHADOW_MAP_RESOLUTION = 64
[docs]
class Light2DPass:
"""GPU pass that renders 2D lights to an offscreen accumulation texture.
Usage from the forward renderer:
1. ``begin_frame()`` — clears per-frame submission lists
2. ``submit_light(...)`` / ``submit_occluder(...)`` — queue data
3. ``render(cmd, extent)`` — render all lights to accumulation RT
4. ``get_light_texture_view()`` — returns the image view for compositing
"""
def __init__(self, engine: Any):
self._engine = engine
# Pipeline resources
self._pipeline: Any = None
self._pipeline_layout: Any = None
self._vert_module: Any = None
self._frag_module: Any = None
# Offscreen render target
self._rt_image: Any = None
self._rt_memory: Any = None
self._rt_view: Any = None
self._rt_render_pass: Any = None
self._rt_framebuffer: Any = None
self._rt_sampler: Any = None
self._rt_width = 0
self._rt_height = 0
# Per-frame submissions
self._lights: list[dict] = []
self._occluders: list[list[tuple[float, float]]] = []
# Shadow map (CPU-computed, uploaded as 1D texture per light)
self._shadow_map: np.ndarray = np.ones(SHADOW_MAP_RESOLUTION, dtype=np.float32)
self._ready = False
[docs]
def setup(self) -> None:
"""Create GPU resources: shaders, pipeline, render target."""
e = self._engine
device = e.ctx.device
phys = e.ctx.physical_device
w, h = e.extent
# Compile light2d shaders
vert_spv = compile_shader(SHADER_DIR / "light2d.vert")
frag_spv = compile_shader(SHADER_DIR / "light2d.frag")
self._vert_module = create_shader_module(device, vert_spv)
self._frag_module = create_shader_module(device, frag_spv)
# Create offscreen render target (additive accumulation)
self._create_render_target(device, phys, w, h)
# Create pipeline with additive blending
self._pipeline, self._pipeline_layout = _create_light2d_pipeline(
device,
self._vert_module,
self._frag_module,
self._rt_render_pass,
(w, h),
)
# Sampler for reading the accumulation texture
sampler_info = vk.VkSamplerCreateInfo(
magFilter=vk.VK_FILTER_LINEAR,
minFilter=vk.VK_FILTER_LINEAR,
addressModeU=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
addressModeV=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
addressModeW=vk.VK_SAMPLER_ADDRESS_MODE_CLAMP_TO_EDGE,
anisotropyEnable=vk.VK_FALSE,
unnormalizedCoordinates=vk.VK_FALSE,
compareEnable=vk.VK_FALSE,
mipmapMode=vk.VK_SAMPLER_MIPMAP_MODE_LINEAR,
)
self._rt_sampler = vk.vkCreateSampler(device, sampler_info, None)
self._ready = True
log.debug("Light2DPass setup complete (%dx%d)", w, h)
def _create_render_target(self, device: Any, phys: Any, w: int, h: int) -> None:
"""Create the offscreen render target for light accumulation."""
self._rt_width = w
self._rt_height = h
fmt = vk.VK_FORMAT_R16G16B16A16_SFLOAT # HDR accumulation
self._rt_image, self._rt_memory = create_image(
device,
phys,
w,
h,
fmt,
vk.VK_IMAGE_USAGE_COLOR_ATTACHMENT_BIT | vk.VK_IMAGE_USAGE_SAMPLED_BIT,
)
self._rt_view = vk.vkCreateImageView(
device,
vk.VkImageViewCreateInfo(
image=self._rt_image,
viewType=vk.VK_IMAGE_VIEW_TYPE_2D,
format=fmt,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0,
levelCount=1,
baseArrayLayer=0,
layerCount=1,
),
),
None,
)
# Render pass: clear to black, store result as shader-readable
self._rt_render_pass = create_offscreen_pass(device, fmt, depth_format=0)
self._rt_framebuffer = vk.vkCreateFramebuffer(
device,
vk.VkFramebufferCreateInfo(
renderPass=self._rt_render_pass,
attachmentCount=1,
pAttachments=[self._rt_view],
width=w,
height=h,
layers=1,
),
None,
)
[docs]
def submit_light(
self,
position: tuple[float, float],
colour: tuple[float, float, float],
energy: float,
light_range: float,
falloff: float = 1.0,
blend_mode: str = "add",
shadow_enabled: bool = False,
shadow_colour: tuple[float, ...] = (0.0, 0.0, 0.0, 0.5),
) -> None:
"""Queue a light for rendering this frame."""
self._lights.append(
{
"position": position,
"colour": colour,
"energy": energy,
"range": light_range,
"falloff": falloff,
"blend_mode": blend_mode,
"shadow_enabled": shadow_enabled,
"shadow_colour": shadow_colour,
}
)
[docs]
def submit_occluder(self, polygon_vertices: list[tuple[float, float]]) -> None:
"""Queue an occluder polygon for shadow casting this frame."""
if len(polygon_vertices) >= 2:
self._occluders.append(polygon_vertices)
[docs]
def begin_frame(self) -> None:
"""Clear per-frame submission lists."""
self._lights.clear()
self._occluders.clear()
[docs]
def render(self, cmd: Any, extent: tuple[int, int]) -> None:
"""Render all queued lights to the accumulation render target.
Must be called outside the main render pass (in pre_render phase).
"""
if not self._ready or not self._lights:
return
w, h = extent
# Resize render target if window changed
if w != self._rt_width or h != self._rt_height:
self._destroy_render_target()
self._create_render_target(
self._engine.ctx.device,
self._engine.ctx.physical_device,
w,
h,
)
# Recreate pipeline for new extent
device = self._engine.ctx.device
if self._pipeline:
vk.vkDestroyPipeline(device, self._pipeline, None)
if self._pipeline_layout:
vk.vkDestroyPipelineLayout(device, self._pipeline_layout, None)
self._pipeline, self._pipeline_layout = _create_light2d_pipeline(
device,
self._vert_module,
self._frag_module,
self._rt_render_pass,
(w, h),
)
# Begin offscreen render pass (clears to black)
clear_value = vk.VkClearValue(
color=vk.VkClearColorValue(float32=[0.0, 0.0, 0.0, 0.0]),
)
begin_info = vk.VkRenderPassBeginInfo(
renderPass=self._rt_render_pass,
framebuffer=self._rt_framebuffer,
renderArea=vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=w, height=h),
),
clearValueCount=1,
pClearValues=[clear_value],
)
vk.vkCmdBeginRenderPass(cmd, begin_info, vk.VK_SUBPASS_CONTENTS_INLINE)
vk_viewport = vk.VkViewport(
x=0.0,
y=0.0,
width=float(w),
height=float(h),
minDepth=0.0,
maxDepth=1.0,
)
scissor = vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=w, height=h),
)
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline)
vk.vkCmdSetViewport(cmd, 0, 1, [vk_viewport])
vk.vkCmdSetScissor(cmd, 0, 1, [scissor])
# Render each light as a fullscreen quad with push constants
for light in self._lights:
shadow_mult = 1.0
if light["shadow_enabled"] and self._occluders:
shadow_mult = self._compute_shadow_multiplier(light)
px, py = light["position"]
r, g, b = light["colour"]
energy = light["energy"] * shadow_mult
lr = light["range"]
falloff = light.get("falloff", 1.0)
# Pack push constants: must match LightPush struct layout
push_data = struct.pack(
"ff ff ffff ffff",
float(w),
float(h), # screen_size
px,
py, # light_pos
lr,
falloff,
energy,
0.0, # range, falloff, energy, pad
r,
g,
b,
1.0, # light_colour
)
self._engine.push_constants(cmd, self._pipeline_layout, push_data)
vk.vkCmdDraw(cmd, 6, 1, 0, 0) # 6 verts = fullscreen quad
vk.vkCmdEndRenderPass(cmd)
def _compute_shadow_multiplier(self, light: dict) -> float:
"""Vectorised CPU shadow estimation using numpy.
Casts SHADOW_MAP_RESOLUTION rays from the light centre and tests each
against all nearby occluder edges in a single batched operation.
"""
lx, ly = light["position"]
lr = light["range"]
lr_sq = lr * lr
# Pre-filter: collect edges from occluders whose AABB overlaps the light circle
edges_a: list[tuple[float, float]] = []
edges_e: list[tuple[float, float]] = [] # edge vectors (bx-ax, by-ay)
for polygon in self._occluders:
xs = [p[0] for p in polygon]
ys = [p[1] for p in polygon]
mn_x, mx_x, mn_y, mx_y = min(xs), max(xs), min(ys), max(ys)
cx = max(mn_x, min(lx, mx_x))
cy = max(mn_y, min(ly, mx_y))
if (cx - lx) ** 2 + (cy - ly) ** 2 > lr_sq:
continue
n = len(polygon)
for i in range(n):
ax, ay = polygon[i]
bx, by = polygon[(i + 1) % n]
edges_a.append((ax, ay))
edges_e.append((bx - ax, by - ay))
if not edges_a:
return 1.0
# Build numpy arrays: edges (E,2) and ray directions (R,2)
ea = np.array(edges_a, dtype=np.float32) # (E, 2) edge start points
ee = np.array(edges_e, dtype=np.float32) # (E, 2) edge vectors
n_rays = SHADOW_MAP_RESOLUTION
angles = np.linspace(0, math.tau, n_rays, endpoint=False, dtype=np.float32)
dirs = np.column_stack((np.cos(angles), np.sin(angles))) # (R, 2)
# Vectorised ray-segment intersection (all rays × all edges)
# For ray (lx,ly)+t*(dx,dy) and segment ea+u*ee:
# t = ((ea - origin) × ee) / (dir × ee)
# u = ((ea - origin) × dir) / (dir × ee)
# where a × b = ax*by - ay*bx (2D cross product)
origin = np.array([lx, ly], dtype=np.float32)
rel = ea - origin # (E, 2)
# Cross products: dir × ee for each (ray, edge) pair
# dirs[:,0]*ee[:,1] - dirs[:,1]*ee[:,0] → (R, E)
denom = dirs[:, 0:1] * ee[:, 1] - dirs[:, 1:2] * ee[:, 0] # (R, E)
# rel × ee → (E,) broadcast to (R, E) via rel doesn't depend on ray
rel_cross_ee = rel[:, 0] * ee[:, 1] - rel[:, 1] * ee[:, 0] # (E,)
# rel × dir → (R, E)
rel_cross_dir = rel[:, 0] * dirs[:, 1:2] - rel[:, 1] * dirs[:, 0:1] # (R, E)
# Avoid division by zero (use safe_divide to suppress warnings)
valid = np.abs(denom) > 1e-10
safe_denom = np.where(valid, denom, 1.0)
t = np.where(valid, rel_cross_ee / safe_denom, -1.0)
u = np.where(valid, rel_cross_dir / safe_denom, -1.0)
# Hit if 0 <= t <= lr and 0 <= u <= 1
hits = valid & (t >= 0) & (t <= lr) & (u >= 0) & (u <= 1) # (R, E)
occluded = np.any(hits, axis=1) # (R,)
ratio = float(np.count_nonzero(occluded)) / n_rays
return 1.0 - ratio * 0.5
[docs]
def get_light_texture_view(self) -> Any:
"""Return the light accumulation image view for compositing."""
return self._rt_view
[docs]
def get_light_sampler(self) -> Any:
"""Return the sampler for the light accumulation texture."""
return self._rt_sampler
@property
def has_lights(self) -> bool:
"""True if any lights were submitted this frame."""
return len(self._lights) > 0
def _destroy_render_target(self) -> None:
"""Destroy offscreen RT resources (for resize)."""
device = self._engine.ctx.device
if self._rt_framebuffer:
vk.vkDestroyFramebuffer(device, self._rt_framebuffer, None)
if self._rt_render_pass:
vk.vkDestroyRenderPass(device, self._rt_render_pass, None)
if self._rt_view:
vk.vkDestroyImageView(device, self._rt_view, None)
if self._rt_image:
vk.vkDestroyImage(device, self._rt_image, None)
if self._rt_memory:
vk.vkFreeMemory(device, self._rt_memory, None)
[docs]
def cleanup(self) -> None:
"""Destroy all GPU resources."""
if not self._ready:
return
device = self._engine.ctx.device
self._destroy_render_target()
for obj, fn in [
(self._pipeline, vk.vkDestroyPipeline),
(self._pipeline_layout, vk.vkDestroyPipelineLayout),
(self._vert_module, vk.vkDestroyShaderModule),
(self._frag_module, vk.vkDestroyShaderModule),
(self._rt_sampler, vk.vkDestroySampler),
]:
if obj:
fn(device, obj, None)
self._ready = False
def _ray_segment_intersect(
ox: float,
oy: float,
dx: float,
dy: float,
ax: float,
ay: float,
bx: float,
by: float,
max_dist: float,
) -> bool:
"""Test if ray (ox,oy)->(dx,dy) intersects segment (ax,ay)-(bx,by) within max_dist."""
ex, ey = bx - ax, by - ay
denom = dx * ey - dy * ex
if abs(denom) < 1e-10:
return False
t = ((ax - ox) * ey - (ay - oy) * ex) / denom
u = ((ax - ox) * dy - (ay - oy) * dx) / denom
return 0.0 <= t <= max_dist and 0.0 <= u <= 1.0
def _create_light2d_pipeline(
device: Any,
vert_module: Any,
frag_module: Any,
render_pass: Any,
extent: tuple[int, int],
) -> tuple[Any, Any]:
"""Create additive-blend pipeline for 2D light rendering (no vertex buffer)."""
ffi = vk.ffi
# Push constant range: 48 bytes (LightPush struct)
push_range = ffi.new("VkPushConstantRange*")
push_range.stageFlags = vk.VK_SHADER_STAGE_VERTEX_BIT | vk.VK_SHADER_STAGE_FRAGMENT_BIT
push_range.offset = 0
push_range.size = PUSH_SIZE
layout_ci = ffi.new("VkPipelineLayoutCreateInfo*")
layout_ci.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_LAYOUT_CREATE_INFO
layout_ci.pushConstantRangeCount = 1
layout_ci.pPushConstantRanges = push_range
layout_out = ffi.new("VkPipelineLayout*")
result = vk._vulkan._callApi(
vk._vulkan.lib.vkCreatePipelineLayout,
device,
layout_ci,
ffi.NULL,
layout_out,
)
if result != vk.VK_SUCCESS:
raise RuntimeError(f"vkCreatePipelineLayout failed: {result}")
pipeline_layout = layout_out[0]
pi = ffi.new("VkGraphicsPipelineCreateInfo*")
pi.sType = vk.VK_STRUCTURE_TYPE_GRAPHICS_PIPELINE_CREATE_INFO
# Shader stages
stages = ffi.new("VkPipelineShaderStageCreateInfo[2]")
main_name = ffi.new("char[]", b"main")
stages[0].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO
stages[0].stage = vk.VK_SHADER_STAGE_VERTEX_BIT
stages[0].module = vert_module
stages[0].pName = main_name
stages[1].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO
stages[1].stage = vk.VK_SHADER_STAGE_FRAGMENT_BIT
stages[1].module = frag_module
stages[1].pName = main_name
pi.stageCount = 2
pi.pStages = stages
# No vertex input (positions generated in vertex shader from gl_VertexIndex)
vi = ffi.new("VkPipelineVertexInputStateCreateInfo*")
vi.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VERTEX_INPUT_STATE_CREATE_INFO
pi.pVertexInputState = vi
# Input assembly — TRIANGLE_LIST (6 verts per quad)
ia = ffi.new("VkPipelineInputAssemblyStateCreateInfo*")
ia.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_INPUT_ASSEMBLY_STATE_CREATE_INFO
ia.topology = vk.VK_PRIMITIVE_TOPOLOGY_TRIANGLE_LIST
pi.pInputAssemblyState = ia
# Viewport state
vps = ffi.new("VkPipelineViewportStateCreateInfo*")
vps.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VIEWPORT_STATE_CREATE_INFO
vps.viewportCount = 1
viewport = ffi.new("VkViewport*")
viewport.width = float(extent[0])
viewport.height = float(extent[1])
viewport.maxDepth = 1.0
vps.pViewports = viewport
scissor = ffi.new("VkRect2D*")
scissor.extent.width = extent[0]
scissor.extent.height = extent[1]
vps.scissorCount = 1
vps.pScissors = scissor
pi.pViewportState = vps
# Rasterization
rs = ffi.new("VkPipelineRasterizationStateCreateInfo*")
rs.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_RASTERIZATION_STATE_CREATE_INFO
rs.polygonMode = vk.VK_POLYGON_MODE_FILL
rs.lineWidth = 1.0
rs.cullMode = vk.VK_CULL_MODE_NONE
pi.pRasterizationState = rs
# Multisample
ms = ffi.new("VkPipelineMultisampleStateCreateInfo*")
ms.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_MULTISAMPLE_STATE_CREATE_INFO
ms.rasterizationSamples = vk.VK_SAMPLE_COUNT_1_BIT
pi.pMultisampleState = ms
# No depth test
dss = ffi.new("VkPipelineDepthStencilStateCreateInfo*")
dss.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DEPTH_STENCIL_STATE_CREATE_INFO
dss.depthTestEnable = 0
dss.depthWriteEnable = 0
pi.pDepthStencilState = dss
# Additive blending: src + dst
cba = ffi.new("VkPipelineColorBlendAttachmentState*")
cba.blendEnable = 1
cba.srcColorBlendFactor = vk.VK_BLEND_FACTOR_ONE
cba.dstColorBlendFactor = vk.VK_BLEND_FACTOR_ONE
cba.colorBlendOp = vk.VK_BLEND_OP_ADD
cba.srcAlphaBlendFactor = vk.VK_BLEND_FACTOR_ONE
cba.dstAlphaBlendFactor = vk.VK_BLEND_FACTOR_ONE
cba.alphaBlendOp = vk.VK_BLEND_OP_ADD
cba.colorWriteMask = (
vk.VK_COLOR_COMPONENT_R_BIT
| vk.VK_COLOR_COMPONENT_G_BIT
| vk.VK_COLOR_COMPONENT_B_BIT
| vk.VK_COLOR_COMPONENT_A_BIT
)
cb = ffi.new("VkPipelineColorBlendStateCreateInfo*")
cb.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_COLOR_BLEND_STATE_CREATE_INFO
cb.attachmentCount = 1
cb.pAttachments = cba
pi.pColorBlendState = cb
# Dynamic state
dyn_states = ffi.new(
"VkDynamicState[2]",
[
vk.VK_DYNAMIC_STATE_VIEWPORT,
vk.VK_DYNAMIC_STATE_SCISSOR,
],
)
ds = ffi.new("VkPipelineDynamicStateCreateInfo*")
ds.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DYNAMIC_STATE_CREATE_INFO
ds.dynamicStateCount = 2
ds.pDynamicStates = dyn_states
pi.pDynamicState = ds
pi.layout = pipeline_layout
pi.renderPass = render_pass
pipeline_out = ffi.new("VkPipeline*")
result = vk._vulkan._callApi(
vk._vulkan.lib.vkCreateGraphicsPipelines,
device,
ffi.NULL,
1,
pi,
ffi.NULL,
pipeline_out,
)
if result != vk.VK_SUCCESS:
raise RuntimeError(f"vkCreateGraphicsPipelines failed: {result}")
log.debug("Light2D pipeline created (additive blend)")
return pipeline_out[0], pipeline_layout