"""Buffer and image allocation helpers."""
from __future__ import annotations
import logging
from typing import Any
import numpy as np
import vulkan as vk
log = logging.getLogger(__name__)
__all__ = [
"create_buffer",
"create_image",
"create_sampler",
"transition_image_layout",
"upload_numpy",
"upload_image_data",
"update_image_data",
"create_indirect_buffer",
"begin_single_time_commands",
"end_single_time_commands",
]
def _find_memory_type(physical_device: Any, type_filter: int, properties: int) -> int:
mem_props = vk.vkGetPhysicalDeviceMemoryProperties(physical_device)
for i in range(mem_props.memoryTypeCount):
if (type_filter & (1 << i)) and (mem_props.memoryTypes[i].propertyFlags & properties) == properties:
return i
raise RuntimeError("Failed to find suitable memory type")
[docs]
def create_buffer(
device: Any,
physical_device: Any,
size: int,
usage: int,
memory_flags: int,
) -> tuple[Any, Any]:
"""Create a VkBuffer with bound memory. Returns (buffer, memory)."""
buf_info = vk.VkBufferCreateInfo(
size=size,
usage=usage,
sharingMode=vk.VK_SHARING_MODE_EXCLUSIVE,
)
buffer = vk.vkCreateBuffer(device, buf_info, None)
mem_reqs = vk.vkGetBufferMemoryRequirements(device, buffer)
alloc_info = vk.VkMemoryAllocateInfo(
allocationSize=mem_reqs.size,
memoryTypeIndex=_find_memory_type(physical_device, mem_reqs.memoryTypeBits, memory_flags),
)
memory = vk.vkAllocateMemory(device, alloc_info, None)
vk.vkBindBufferMemory(device, buffer, memory, 0)
return buffer, memory
[docs]
def create_image(
device: Any,
physical_device: Any,
width: int,
height: int,
fmt: int,
usage: int,
) -> tuple[Any, Any]:
"""Create a VkImage with bound memory. Returns (image, memory)."""
img_info = vk.VkImageCreateInfo(
imageType=vk.VK_IMAGE_TYPE_2D,
format=fmt,
extent=vk.VkExtent3D(width=width, height=height, depth=1),
mipLevels=1,
arrayLayers=1,
samples=vk.VK_SAMPLE_COUNT_1_BIT,
tiling=vk.VK_IMAGE_TILING_OPTIMAL,
usage=usage,
sharingMode=vk.VK_SHARING_MODE_EXCLUSIVE,
initialLayout=vk.VK_IMAGE_LAYOUT_UNDEFINED,
)
image = vk.vkCreateImage(device, img_info, None)
mem_reqs = vk.vkGetImageMemoryRequirements(device, image)
alloc_info = vk.VkMemoryAllocateInfo(
allocationSize=mem_reqs.size,
memoryTypeIndex=_find_memory_type(
physical_device,
mem_reqs.memoryTypeBits,
vk.VK_MEMORY_PROPERTY_DEVICE_LOCAL_BIT,
),
)
memory = vk.vkAllocateMemory(device, alloc_info, None)
vk.vkBindImageMemory(device, image, memory, 0)
return image, memory
[docs]
def create_sampler(device: Any, filter_mode: int = vk.VK_FILTER_LINEAR) -> Any:
"""Create a VkSampler with configurable filtering."""
sampler_info = vk.VkSamplerCreateInfo(
magFilter=filter_mode,
minFilter=filter_mode,
mipmapMode=vk.VK_SAMPLER_MIPMAP_MODE_LINEAR,
addressModeU=vk.VK_SAMPLER_ADDRESS_MODE_REPEAT,
addressModeV=vk.VK_SAMPLER_ADDRESS_MODE_REPEAT,
addressModeW=vk.VK_SAMPLER_ADDRESS_MODE_REPEAT,
mipLodBias=0.0,
anisotropyEnable=False,
maxAnisotropy=1.0,
minLod=0.0,
maxLod=0.0,
)
return vk.vkCreateSampler(device, sampler_info, None)
[docs]
def transition_image_layout(
device: Any,
queue: Any,
cmd_pool: Any,
image: Any,
old_layout: int,
new_layout: int,
aspect_mask: int = vk.VK_IMAGE_ASPECT_COLOR_BIT,
) -> None:
"""Transition image layout via one-time command buffer."""
alloc_info = vk.VkCommandBufferAllocateInfo(
commandPool=cmd_pool,
level=vk.VK_COMMAND_BUFFER_LEVEL_PRIMARY,
commandBufferCount=1,
)
cmd = vk.vkAllocateCommandBuffers(device, alloc_info)[0]
vk.vkBeginCommandBuffer(
cmd,
vk.VkCommandBufferBeginInfo(
flags=vk.VK_COMMAND_BUFFER_USAGE_ONE_TIME_SUBMIT_BIT,
),
)
# Determine access masks and stages from layouts
src_access = 0
src_stage = vk.VK_PIPELINE_STAGE_TOP_OF_PIPE_BIT
dst_access = 0
dst_stage = vk.VK_PIPELINE_STAGE_BOTTOM_OF_PIPE_BIT
if old_layout == vk.VK_IMAGE_LAYOUT_COLOR_ATTACHMENT_OPTIMAL:
src_access = vk.VK_ACCESS_COLOR_ATTACHMENT_WRITE_BIT
src_stage = vk.VK_PIPELINE_STAGE_COLOR_ATTACHMENT_OUTPUT_BIT
elif old_layout == vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL:
src_access = vk.VK_ACCESS_TRANSFER_WRITE_BIT
src_stage = vk.VK_PIPELINE_STAGE_TRANSFER_BIT
elif old_layout == vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL:
src_access = vk.VK_ACCESS_SHADER_READ_BIT
src_stage = vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT
if new_layout == vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL:
dst_access = vk.VK_ACCESS_SHADER_READ_BIT
dst_stage = vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT
elif new_layout == vk.VK_IMAGE_LAYOUT_COLOR_ATTACHMENT_OPTIMAL:
dst_access = vk.VK_ACCESS_COLOR_ATTACHMENT_WRITE_BIT
dst_stage = vk.VK_PIPELINE_STAGE_COLOR_ATTACHMENT_OUTPUT_BIT
elif new_layout == vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL:
dst_access = vk.VK_ACCESS_TRANSFER_WRITE_BIT
dst_stage = vk.VK_PIPELINE_STAGE_TRANSFER_BIT
barrier = vk.VkImageMemoryBarrier(
srcAccessMask=src_access,
dstAccessMask=dst_access,
oldLayout=old_layout,
newLayout=new_layout,
srcQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
dstQueueFamilyIndex=vk.VK_QUEUE_FAMILY_IGNORED,
image=image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=aspect_mask,
baseMipLevel=0,
levelCount=1,
baseArrayLayer=0,
layerCount=1,
),
)
vk.vkCmdPipelineBarrier(cmd, src_stage, dst_stage, 0, 0, None, 0, None, 1, [barrier])
vk.vkEndCommandBuffer(cmd)
submit = vk.VkSubmitInfo(commandBufferCount=1, pCommandBuffers=[cmd])
vk.vkQueueSubmit(queue, 1, [submit], None)
vk.vkQueueWaitIdle(queue)
vk.vkFreeCommandBuffers(device, cmd_pool, 1, [cmd])
[docs]
def upload_numpy(device: Any, memory: Any, data: np.ndarray) -> None:
"""Map device memory and copy a numpy array into it."""
size = data.nbytes
src = vk.ffi.cast("void*", data.ctypes.data)
dst = vk.vkMapMemory(device, memory, 0, size, 0)
vk.ffi.memmove(dst, src, size)
vk.vkUnmapMemory(device, memory)
[docs]
def begin_single_time_commands(device: Any, cmd_pool: Any) -> Any:
"""Allocate and begin a one-time command buffer."""
alloc_info = vk.VkCommandBufferAllocateInfo(
commandPool=cmd_pool,
level=vk.VK_COMMAND_BUFFER_LEVEL_PRIMARY,
commandBufferCount=1,
)
cmd = vk.vkAllocateCommandBuffers(device, alloc_info)[0]
vk.vkBeginCommandBuffer(
cmd,
vk.VkCommandBufferBeginInfo(
flags=vk.VK_COMMAND_BUFFER_USAGE_ONE_TIME_SUBMIT_BIT,
),
)
return cmd
[docs]
def end_single_time_commands(device: Any, queue: Any, cmd_pool: Any, cmd: Any) -> None:
"""End, submit, wait, and free a one-time command buffer."""
vk.vkEndCommandBuffer(cmd)
submit = vk.VkSubmitInfo(commandBufferCount=1, pCommandBuffers=[cmd])
vk.vkQueueSubmit(queue, 1, [submit], None)
vk.vkQueueWaitIdle(queue)
vk.vkFreeCommandBuffers(device, cmd_pool, 1, [cmd])
[docs]
def upload_image_data(
device: Any,
physical_device: Any,
queue: Any,
cmd_pool: Any,
pixels: np.ndarray,
width: int,
height: int,
fmt: int = vk.VK_FORMAT_R8G8B8A8_UNORM,
) -> tuple[Any, Any]:
"""Upload pixel data to a device-local image via staging buffer.
Args:
pixels: Contiguous RGBA uint8 array, shape (height, width, 4).
Returns: (image, memory)
"""
ffi = vk.ffi
pixel_size = pixels.nbytes
# Staging buffer
staging_buf, staging_mem = create_buffer(
device,
physical_device,
pixel_size,
vk.VK_BUFFER_USAGE_TRANSFER_SRC_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
dst = vk.vkMapMemory(device, staging_mem, 0, pixel_size, 0)
ffi.memmove(dst, ffi.cast("void*", pixels.ctypes.data), pixel_size)
vk.vkUnmapMemory(device, staging_mem)
# Device-local image
image, image_mem = create_image(
device,
physical_device,
width,
height,
fmt,
vk.VK_IMAGE_USAGE_TRANSFER_DST_BIT | vk.VK_IMAGE_USAGE_SAMPLED_BIT,
)
# Transition UNDEFINED → TRANSFER_DST
transition_image_layout(
device, queue, cmd_pool, image, vk.VK_IMAGE_LAYOUT_UNDEFINED, vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL
)
# Copy staging buffer → image
cmd = begin_single_time_commands(device, cmd_pool)
region = vk.VkBufferImageCopy(
bufferOffset=0,
bufferRowLength=0,
bufferImageHeight=0,
imageSubresource=vk.VkImageSubresourceLayers(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
mipLevel=0,
baseArrayLayer=0,
layerCount=1,
),
imageOffset=vk.VkOffset3D(x=0, y=0, z=0),
imageExtent=vk.VkExtent3D(width=width, height=height, depth=1),
)
vk.vkCmdCopyBufferToImage(
cmd,
staging_buf,
image,
vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL,
1,
[region],
)
end_single_time_commands(device, queue, cmd_pool, cmd)
# Transition TRANSFER_DST → SHADER_READ_ONLY
transition_image_layout(
device,
queue,
cmd_pool,
image,
vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL,
vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL,
)
# Cleanup staging
vk.vkDestroyBuffer(device, staging_buf, None)
vk.vkFreeMemory(device, staging_mem, None)
return image, image_mem
[docs]
def update_image_data(
device: Any,
physical_device: Any,
queue: Any,
cmd_pool: Any,
image: Any,
pixels: np.ndarray,
width: int,
height: int,
) -> None:
"""Re-upload pixel data to an existing VkImage (same dimensions).
Transitions the image from SHADER_READ_ONLY → TRANSFER_DST, copies
the new pixel data via a staging buffer, then transitions back to
SHADER_READ_ONLY.
"""
ffi = vk.ffi
pixel_size = pixels.nbytes
# Staging buffer
staging_buf, staging_mem = create_buffer(
device, physical_device, pixel_size,
vk.VK_BUFFER_USAGE_TRANSFER_SRC_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
dst = vk.vkMapMemory(device, staging_mem, 0, pixel_size, 0)
ffi.memmove(dst, ffi.cast("void*", pixels.ctypes.data), pixel_size)
vk.vkUnmapMemory(device, staging_mem)
# SHADER_READ_ONLY → TRANSFER_DST
transition_image_layout(
device, queue, cmd_pool, image,
vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL,
vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL,
)
# Copy staging → image
cmd = begin_single_time_commands(device, cmd_pool)
region = vk.VkBufferImageCopy(
bufferOffset=0, bufferRowLength=0, bufferImageHeight=0,
imageSubresource=vk.VkImageSubresourceLayers(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
mipLevel=0, baseArrayLayer=0, layerCount=1,
),
imageOffset=vk.VkOffset3D(x=0, y=0, z=0),
imageExtent=vk.VkExtent3D(width=width, height=height, depth=1),
)
vk.vkCmdCopyBufferToImage(cmd, staging_buf, image, vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL, 1, [region])
end_single_time_commands(device, queue, cmd_pool, cmd)
# TRANSFER_DST → SHADER_READ_ONLY
transition_image_layout(
device, queue, cmd_pool, image,
vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL,
vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL,
)
# Cleanup staging
vk.vkDestroyBuffer(device, staging_buf, None)
vk.vkFreeMemory(device, staging_mem, None)
[docs]
def create_indirect_buffer(
device: Any,
physical_device: Any,
draw_count: int,
) -> tuple[Any, Any]:
"""Create a host-visible buffer for VkDrawIndexedIndirectCommand array."""
buffer_size = draw_count * 20 # 5 x uint32 = 20 bytes per command
return create_buffer(
device,
physical_device,
buffer_size,
vk.VK_BUFFER_USAGE_INDIRECT_BUFFER_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)