"""Off-screen render pass that outputs entity IDs to an R32_UINT framebuffer."""
from __future__ import annotations
import logging
import struct
from pathlib import Path
from typing import Any
import vulkan as vk
from ..gpu.memory import create_buffer, create_image
from ..gpu.pipeline import create_pick_pipeline
from ..renderer.pass_helpers import load_shader_modules
from ..renderer.passes import create_pick_pass
__all__ = ["PickPass"]
log = logging.getLogger(__name__)
SHADER_DIR = Path(__file__).resolve().parent.parent.parent.parent.parent / "shaders"
MISS = 0xFFFFFFFF
[docs]
class PickPass:
"""Renders entity IDs to a dedicated framebuffer for mouse picking."""
def __init__(
self,
device: Any,
physical_device: Any,
graphics_queue: Any,
queue_family_index: int,
extent: tuple[int, int],
descriptor_layout: Any,
descriptor_set: Any,
) -> None:
self.device = device
self.physical_device = physical_device
self.graphics_queue = graphics_queue
self.queue_family_index = queue_family_index
self.extent = extent
self.descriptor_layout = descriptor_layout
self.descriptor_set = descriptor_set
# Resources (populated in create())
self._pick_image: Any = None
self._pick_memory: Any = None
self._pick_view: Any = None
self._depth_image: Any = None
self._depth_memory: Any = None
self._depth_view: Any = None
self._render_pass: Any = None
self._framebuffer: Any = None
self._pipeline: Any = None
self._pipeline_layout: Any = None
self._staging_buf: Any = None
self._staging_mem: Any = None
self._cmd_pool: Any = None
self._cmd_buf: Any = None
self._fence: Any = None
[docs]
def create(self) -> None:
w, h = self.extent
depth_fmt = vk.VK_FORMAT_D32_SFLOAT
# Colour image (R32_UINT)
self._pick_image, self._pick_memory = create_image(
self.device, self.physical_device, w, h,
vk.VK_FORMAT_R32_UINT,
vk.VK_IMAGE_USAGE_COLOR_ATTACHMENT_BIT | vk.VK_IMAGE_USAGE_TRANSFER_SRC_BIT,
)
self._pick_view = vk.vkCreateImageView(self.device, vk.VkImageViewCreateInfo(
image=self._pick_image,
viewType=vk.VK_IMAGE_VIEW_TYPE_2D,
format=vk.VK_FORMAT_R32_UINT,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0, levelCount=1,
baseArrayLayer=0, layerCount=1,
),
), None)
# Depth image
self._depth_image, self._depth_memory = create_image(
self.device, self.physical_device, w, h,
depth_fmt,
vk.VK_IMAGE_USAGE_DEPTH_STENCIL_ATTACHMENT_BIT,
)
self._depth_view = vk.vkCreateImageView(self.device, vk.VkImageViewCreateInfo(
image=self._depth_image,
viewType=vk.VK_IMAGE_VIEW_TYPE_2D,
format=depth_fmt,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_DEPTH_BIT,
baseMipLevel=0, levelCount=1,
baseArrayLayer=0, layerCount=1,
),
), None)
# Render pass
self._render_pass = create_pick_pass(self.device, depth_fmt)
# Framebuffer
self._framebuffer = vk.vkCreateFramebuffer(self.device, vk.VkFramebufferCreateInfo(
renderPass=self._render_pass,
attachmentCount=2,
pAttachments=[self._pick_view, self._depth_view],
width=w, height=h, layers=1,
), None)
# Compile pick shaders and create pipeline
vert_mod, frag_mod = load_shader_modules(self.device, SHADER_DIR, "pick.vert", "pick.frag")
self._pipeline, self._pipeline_layout = create_pick_pipeline(
self.device, vert_mod, frag_mod,
self._render_pass, self.extent, self.descriptor_layout,
)
# Clean up shader modules (embedded in pipeline now)
vk.vkDestroyShaderModule(self.device, vert_mod, None)
vk.vkDestroyShaderModule(self.device, frag_mod, None)
# Staging buffer for pixel readback (4 bytes)
self._staging_buf, self._staging_mem = create_buffer(
self.device, self.physical_device, 4,
vk.VK_BUFFER_USAGE_TRANSFER_DST_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
# Command pool + buffer for pick operations
pool_info = vk.VkCommandPoolCreateInfo(
flags=vk.VK_COMMAND_POOL_CREATE_RESET_COMMAND_BUFFER_BIT,
queueFamilyIndex=self.queue_family_index,
)
self._cmd_pool = vk.vkCreateCommandPool(self.device, pool_info, None)
alloc_info = vk.VkCommandBufferAllocateInfo(
commandPool=self._cmd_pool,
level=vk.VK_COMMAND_BUFFER_LEVEL_PRIMARY,
commandBufferCount=1,
)
self._cmd_buf = vk.vkAllocateCommandBuffers(self.device, alloc_info)[0]
# Fence for synchronization
self._fence = vk.vkCreateFence(self.device, vk.VkFenceCreateInfo(), None)
[docs]
def pick(
self,
x: int,
y: int,
view_proj_data: bytes,
vertex_buffer: Any,
index_buffer: Any,
index_count: int,
instance_count: int,
) -> int:
"""Render pick pass and read entity ID at (x, y). Returns entity ID or -1."""
w, h = self.extent
if x < 0 or x >= w or y < 0 or y >= h:
return -1
cmd = self._cmd_buf
vk.vkResetCommandBuffer(cmd, 0)
vk.vkBeginCommandBuffer(cmd, vk.VkCommandBufferBeginInfo(
flags=vk.VK_COMMAND_BUFFER_USAGE_ONE_TIME_SUBMIT_BIT,
))
# Begin render pass — clear to MISS (0xFFFFFFFF)
clear_values = [
vk.VkClearValue(color=vk.VkClearColorValue(uint32=[MISS, 0, 0, 0])),
vk.VkClearValue(depthStencil=vk.VkClearDepthStencilValue(depth=1.0, stencil=0)),
]
rp_begin = vk.VkRenderPassBeginInfo(
renderPass=self._render_pass,
framebuffer=self._framebuffer,
renderArea=vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=w, height=h),
),
clearValueCount=2,
pClearValues=clear_values,
)
vk.vkCmdBeginRenderPass(cmd, rp_begin, vk.VK_SUBPASS_CONTENTS_INLINE)
# Set viewport/scissor
viewport = vk.VkViewport(
x=0.0, y=0.0, width=float(w), height=float(h),
minDepth=0.0, maxDepth=1.0,
)
vk.vkCmdSetViewport(cmd, 0, 1, [viewport])
scissor = vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=w, height=h),
)
vk.vkCmdSetScissor(cmd, 0, 1, [scissor])
# Bind pipeline + descriptors
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline)
vk.vkCmdBindDescriptorSets(
cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline_layout,
0, 1, [self.descriptor_set], 0, None,
)
# Push constants (view + proj)
ffi = vk.ffi
cbuf = ffi.new("char[]", view_proj_data)
vk._vulkan.lib.vkCmdPushConstants(
cmd, self._pipeline_layout,
vk.VK_SHADER_STAGE_VERTEX_BIT | vk.VK_SHADER_STAGE_FRAGMENT_BIT,
0, len(view_proj_data), cbuf,
)
# Bind vertex/index buffers and draw
vk.vkCmdBindVertexBuffers(cmd, 0, 1, [vertex_buffer], [0])
vk.vkCmdBindIndexBuffer(cmd, index_buffer, 0, vk.VK_INDEX_TYPE_UINT16)
vk.vkCmdDrawIndexed(cmd, index_count, instance_count, 0, 0, 0)
vk.vkCmdEndRenderPass(cmd)
# Transition pick image for transfer read
barrier = vk.VkImageMemoryBarrier(
srcAccessMask=vk.VK_ACCESS_COLOR_ATTACHMENT_WRITE_BIT,
dstAccessMask=vk.VK_ACCESS_TRANSFER_READ_BIT,
oldLayout=vk.VK_IMAGE_LAYOUT_COLOR_ATTACHMENT_OPTIMAL,
newLayout=vk.VK_IMAGE_LAYOUT_TRANSFER_SRC_OPTIMAL,
srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
image=self._pick_image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0, levelCount=1,
baseArrayLayer=0, layerCount=1,
),
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_COLOR_ATTACHMENT_OUTPUT_BIT,
vk.VK_PIPELINE_STAGE_TRANSFER_BIT,
0, 0, None, 0, None, 1, [barrier],
)
# Copy single pixel to staging buffer
region = vk.VkBufferImageCopy(
bufferOffset=0,
bufferRowLength=0,
bufferImageHeight=0,
imageSubresource=vk.VkImageSubresourceLayers(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
mipLevel=0,
baseArrayLayer=0,
layerCount=1,
),
imageOffset=vk.VkOffset3D(x=x, y=y, z=0),
imageExtent=vk.VkExtent3D(width=1, height=1, depth=1),
)
vk.vkCmdCopyImageToBuffer(
cmd, self._pick_image,
vk.VK_IMAGE_LAYOUT_TRANSFER_SRC_OPTIMAL,
self._staging_buf, 1, [region],
)
vk.vkEndCommandBuffer(cmd)
# Submit and wait
submit_info = vk.VkSubmitInfo(
commandBufferCount=1,
pCommandBuffers=[cmd],
)
vk.vkQueueSubmit(self.graphics_queue, 1, [submit_info], self._fence)
vk.vkWaitForFences(self.device, 1, [self._fence], vk.VK_TRUE, 2_000_000_000)
vk.vkResetFences(self.device, 1, [self._fence])
# Read back the pixel — vkMapMemory returns a ffi buffer directly
mapped = vk.vkMapMemory(self.device, self._staging_mem, 0, 4, 0)
entity_id = struct.unpack("=I", bytes(mapped[:4]))[0]
vk.vkUnmapMemory(self.device, self._staging_mem)
return entity_id if entity_id != MISS else -1
[docs]
def resize(self, extent: tuple[int, int]) -> None:
"""Recreate framebuffer resources for new extent."""
self._destroy_resources()
self.extent = extent
self.create()
def _destroy_resources(self) -> None:
"""Destroy GPU resources (not command pool/fence)."""
if self._framebuffer:
vk.vkDestroyFramebuffer(self.device, self._framebuffer, None)
if self._pick_view:
vk.vkDestroyImageView(self.device, self._pick_view, None)
if self._pick_image:
vk.vkDestroyImage(self.device, self._pick_image, None)
if self._pick_memory:
vk.vkFreeMemory(self.device, self._pick_memory, None)
if self._depth_view:
vk.vkDestroyImageView(self.device, self._depth_view, None)
if self._depth_image:
vk.vkDestroyImage(self.device, self._depth_image, None)
if self._depth_memory:
vk.vkFreeMemory(self.device, self._depth_memory, None)
if self._pipeline:
vk.vkDestroyPipeline(self.device, self._pipeline, None)
if self._pipeline_layout:
vk.vkDestroyPipelineLayout(self.device, self._pipeline_layout, None)
if self._render_pass:
vk.vkDestroyRenderPass(self.device, self._render_pass, None)
if self._staging_buf:
vk.vkDestroyBuffer(self.device, self._staging_buf, None)
if self._staging_mem:
vk.vkFreeMemory(self.device, self._staging_mem, None)
[docs]
def destroy(self) -> None:
"""Clean up all resources."""
self._destroy_resources()
if self._fence:
vk.vkDestroyFence(self.device, self._fence, None)
if self._cmd_pool:
vk.vkDestroyCommandPool(self.device, self._cmd_pool, None)