Source code for simvx.graphics.gpu.memory

"""Buffer and image allocation helpers."""


from __future__ import annotations

import logging
from typing import Any

import numpy as np
import vulkan as vk

log = logging.getLogger(__name__)

__all__ = [
    "create_buffer",
    "create_image",
    "create_sampler",
    "transition_image_layout",
    "upload_numpy",
    "upload_image_data",
    "update_image_data",
    "create_indirect_buffer",
    "begin_single_time_commands",
    "end_single_time_commands",
]

def _find_memory_type(physical_device: Any, type_filter: int, properties: int) -> int:
    mem_props = vk.vkGetPhysicalDeviceMemoryProperties(physical_device)
    for i in range(mem_props.memoryTypeCount):
        if (type_filter & (1 << i)) and (mem_props.memoryTypes[i].propertyFlags & properties) == properties:
            return i
    raise RuntimeError("Failed to find suitable memory type")


[docs] def create_buffer( device: Any, physical_device: Any, size: int, usage: int, memory_flags: int, ) -> tuple[Any, Any]: """Create a VkBuffer with bound memory. Returns (buffer, memory).""" buf_info = vk.VkBufferCreateInfo( size=size, usage=usage, sharingMode=vk.VK_SHARING_MODE_EXCLUSIVE, ) buffer = vk.vkCreateBuffer(device, buf_info, None) mem_reqs = vk.vkGetBufferMemoryRequirements(device, buffer) alloc_info = vk.VkMemoryAllocateInfo( allocationSize=mem_reqs.size, memoryTypeIndex=_find_memory_type(physical_device, mem_reqs.memoryTypeBits, memory_flags), ) memory = vk.vkAllocateMemory(device, alloc_info, None) vk.vkBindBufferMemory(device, buffer, memory, 0) return buffer, memory
[docs] def create_image( device: Any, physical_device: Any, width: int, height: int, fmt: int, usage: int, ) -> tuple[Any, Any]: """Create a VkImage with bound memory. Returns (image, memory).""" img_info = vk.VkImageCreateInfo( imageType=vk.VK_IMAGE_TYPE_2D, format=fmt, extent=vk.VkExtent3D(width=width, height=height, depth=1), mipLevels=1, arrayLayers=1, samples=vk.VK_SAMPLE_COUNT_1_BIT, tiling=vk.VK_IMAGE_TILING_OPTIMAL, usage=usage, sharingMode=vk.VK_SHARING_MODE_EXCLUSIVE, initialLayout=vk.VK_IMAGE_LAYOUT_UNDEFINED, ) image = vk.vkCreateImage(device, img_info, None) mem_reqs = vk.vkGetImageMemoryRequirements(device, image) alloc_info = vk.VkMemoryAllocateInfo( allocationSize=mem_reqs.size, memoryTypeIndex=_find_memory_type( physical_device, mem_reqs.memoryTypeBits, vk.VK_MEMORY_PROPERTY_DEVICE_LOCAL_BIT, ), ) memory = vk.vkAllocateMemory(device, alloc_info, None) vk.vkBindImageMemory(device, image, memory, 0) return image, memory
[docs] def create_sampler(device: Any, filter_mode: int = vk.VK_FILTER_LINEAR) -> Any: """Create a VkSampler with configurable filtering.""" sampler_info = vk.VkSamplerCreateInfo( magFilter=filter_mode, minFilter=filter_mode, mipmapMode=vk.VK_SAMPLER_MIPMAP_MODE_LINEAR, addressModeU=vk.VK_SAMPLER_ADDRESS_MODE_REPEAT, addressModeV=vk.VK_SAMPLER_ADDRESS_MODE_REPEAT, addressModeW=vk.VK_SAMPLER_ADDRESS_MODE_REPEAT, mipLodBias=0.0, anisotropyEnable=False, maxAnisotropy=1.0, minLod=0.0, maxLod=0.0, ) return vk.vkCreateSampler(device, sampler_info, None)
[docs] def transition_image_layout( device: Any, queue: Any, cmd_pool: Any, image: Any, old_layout: int, new_layout: int, aspect_mask: int = vk.VK_IMAGE_ASPECT_COLOR_BIT, ) -> None: """Transition image layout via one-time command buffer.""" alloc_info = vk.VkCommandBufferAllocateInfo( commandPool=cmd_pool, level=vk.VK_COMMAND_BUFFER_LEVEL_PRIMARY, commandBufferCount=1, ) cmd = vk.vkAllocateCommandBuffers(device, alloc_info)[0] vk.vkBeginCommandBuffer( cmd, vk.VkCommandBufferBeginInfo( flags=vk.VK_COMMAND_BUFFER_USAGE_ONE_TIME_SUBMIT_BIT, ), ) # Determine access masks and stages from layouts src_access = 0 src_stage = vk.VK_PIPELINE_STAGE_TOP_OF_PIPE_BIT dst_access = 0 dst_stage = vk.VK_PIPELINE_STAGE_BOTTOM_OF_PIPE_BIT if old_layout == vk.VK_IMAGE_LAYOUT_COLOR_ATTACHMENT_OPTIMAL: src_access = vk.VK_ACCESS_COLOR_ATTACHMENT_WRITE_BIT src_stage = vk.VK_PIPELINE_STAGE_COLOR_ATTACHMENT_OUTPUT_BIT elif old_layout == vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL: src_access = vk.VK_ACCESS_TRANSFER_WRITE_BIT src_stage = vk.VK_PIPELINE_STAGE_TRANSFER_BIT elif old_layout == vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL: src_access = vk.VK_ACCESS_SHADER_READ_BIT src_stage = vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT if new_layout == vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL: dst_access = vk.VK_ACCESS_SHADER_READ_BIT dst_stage = vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT elif new_layout == vk.VK_IMAGE_LAYOUT_COLOR_ATTACHMENT_OPTIMAL: dst_access = vk.VK_ACCESS_COLOR_ATTACHMENT_WRITE_BIT dst_stage = vk.VK_PIPELINE_STAGE_COLOR_ATTACHMENT_OUTPUT_BIT elif new_layout == vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL: dst_access = vk.VK_ACCESS_TRANSFER_WRITE_BIT dst_stage = vk.VK_PIPELINE_STAGE_TRANSFER_BIT barrier = vk.VkImageMemoryBarrier( srcAccessMask=src_access, dstAccessMask=dst_access, oldLayout=old_layout, newLayout=new_layout, srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED, image=image, subresourceRange=vk.VkImageSubresourceRange( aspectMask=aspect_mask, baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1, ), ) vk.vkCmdPipelineBarrier(cmd, src_stage, dst_stage, 0, 0, None, 0, None, 1, [barrier]) vk.vkEndCommandBuffer(cmd) submit = vk.VkSubmitInfo(commandBufferCount=1, pCommandBuffers=[cmd]) vk.vkQueueSubmit(queue, 1, [submit], None) vk.vkQueueWaitIdle(queue) vk.vkFreeCommandBuffers(device, cmd_pool, 1, [cmd])
[docs] def upload_numpy(device: Any, memory: Any, data: np.ndarray) -> None: """Map device memory and copy a numpy array into it.""" size = data.nbytes src = vk.ffi.cast("void*", data.ctypes.data) dst = vk.vkMapMemory(device, memory, 0, size, 0) vk.ffi.memmove(dst, src, size) vk.vkUnmapMemory(device, memory)
[docs] def begin_single_time_commands(device: Any, cmd_pool: Any) -> Any: """Allocate and begin a one-time command buffer.""" alloc_info = vk.VkCommandBufferAllocateInfo( commandPool=cmd_pool, level=vk.VK_COMMAND_BUFFER_LEVEL_PRIMARY, commandBufferCount=1, ) cmd = vk.vkAllocateCommandBuffers(device, alloc_info)[0] vk.vkBeginCommandBuffer( cmd, vk.VkCommandBufferBeginInfo( flags=vk.VK_COMMAND_BUFFER_USAGE_ONE_TIME_SUBMIT_BIT, ), ) return cmd
[docs] def end_single_time_commands(device: Any, queue: Any, cmd_pool: Any, cmd: Any) -> None: """End, submit, wait, and free a one-time command buffer.""" vk.vkEndCommandBuffer(cmd) submit = vk.VkSubmitInfo(commandBufferCount=1, pCommandBuffers=[cmd]) vk.vkQueueSubmit(queue, 1, [submit], None) vk.vkQueueWaitIdle(queue) vk.vkFreeCommandBuffers(device, cmd_pool, 1, [cmd])
[docs] def upload_image_data( device: Any, physical_device: Any, queue: Any, cmd_pool: Any, pixels: np.ndarray, width: int, height: int, fmt: int = vk.VK_FORMAT_R8G8B8A8_UNORM, ) -> tuple[Any, Any]: """Upload pixel data to a device-local image via staging buffer. Args: pixels: Contiguous RGBA uint8 array, shape (height, width, 4). Returns: (image, memory) """ ffi = vk.ffi pixel_size = pixels.nbytes # Staging buffer staging_buf, staging_mem = create_buffer( device, physical_device, pixel_size, vk.VK_BUFFER_USAGE_TRANSFER_SRC_BIT, vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT, ) dst = vk.vkMapMemory(device, staging_mem, 0, pixel_size, 0) ffi.memmove(dst, ffi.cast("void*", pixels.ctypes.data), pixel_size) vk.vkUnmapMemory(device, staging_mem) # Device-local image image, image_mem = create_image( device, physical_device, width, height, fmt, vk.VK_IMAGE_USAGE_TRANSFER_DST_BIT | vk.VK_IMAGE_USAGE_SAMPLED_BIT, ) # Transition UNDEFINED → TRANSFER_DST transition_image_layout( device, queue, cmd_pool, image, vk.VK_IMAGE_LAYOUT_UNDEFINED, vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL ) # Copy staging buffer → image cmd = begin_single_time_commands(device, cmd_pool) region = vk.VkBufferImageCopy( bufferOffset=0, bufferRowLength=0, bufferImageHeight=0, imageSubresource=vk.VkImageSubresourceLayers( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, mipLevel=0, baseArrayLayer=0, layerCount=1, ), imageOffset=vk.VkOffset3D(x=0, y=0, z=0), imageExtent=vk.VkExtent3D(width=width, height=height, depth=1), ) vk.vkCmdCopyBufferToImage( cmd, staging_buf, image, vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL, 1, [region], ) end_single_time_commands(device, queue, cmd_pool, cmd) # Transition TRANSFER_DST → SHADER_READ_ONLY transition_image_layout( device, queue, cmd_pool, image, vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL, vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL, ) # Cleanup staging vk.vkDestroyBuffer(device, staging_buf, None) vk.vkFreeMemory(device, staging_mem, None) return image, image_mem
[docs] def update_image_data( device: Any, physical_device: Any, queue: Any, cmd_pool: Any, image: Any, pixels: np.ndarray, width: int, height: int, ) -> None: """Re-upload pixel data to an existing VkImage (same dimensions). Transitions the image from SHADER_READ_ONLY → TRANSFER_DST, copies the new pixel data via a staging buffer, then transitions back to SHADER_READ_ONLY. """ ffi = vk.ffi pixel_size = pixels.nbytes # Staging buffer staging_buf, staging_mem = create_buffer( device, physical_device, pixel_size, vk.VK_BUFFER_USAGE_TRANSFER_SRC_BIT, vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT, ) dst = vk.vkMapMemory(device, staging_mem, 0, pixel_size, 0) ffi.memmove(dst, ffi.cast("void*", pixels.ctypes.data), pixel_size) vk.vkUnmapMemory(device, staging_mem) # SHADER_READ_ONLY → TRANSFER_DST transition_image_layout( device, queue, cmd_pool, image, vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL, vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL, ) # Copy staging → image cmd = begin_single_time_commands(device, cmd_pool) region = vk.VkBufferImageCopy( bufferOffset=0, bufferRowLength=0, bufferImageHeight=0, imageSubresource=vk.VkImageSubresourceLayers( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, mipLevel=0, baseArrayLayer=0, layerCount=1, ), imageOffset=vk.VkOffset3D(x=0, y=0, z=0), imageExtent=vk.VkExtent3D(width=width, height=height, depth=1), ) vk.vkCmdCopyBufferToImage(cmd, staging_buf, image, vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL, 1, [region]) end_single_time_commands(device, queue, cmd_pool, cmd) # TRANSFER_DST → SHADER_READ_ONLY transition_image_layout( device, queue, cmd_pool, image, vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL, vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL, ) # Cleanup staging vk.vkDestroyBuffer(device, staging_buf, None) vk.vkFreeMemory(device, staging_mem, None)
[docs] def create_indirect_buffer( device: Any, physical_device: Any, draw_count: int, ) -> tuple[Any, Any]: """Create a host-visible buffer for VkDrawIndexedIndirectCommand array.""" buffer_size = draw_count * 20 # 5 x uint32 = 20 bytes per command return create_buffer( device, physical_device, buffer_size, vk.VK_BUFFER_USAGE_INDIRECT_BUFFER_BIT, vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT, )