"""2D text overlay pass using MSDF atlas — renders after 3D geometry."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
import numpy as np
import vulkan as vk
from ..gpu.descriptors import (
create_texture_descriptor_layout,
write_texture_descriptor,
)
from ..gpu.memory import (
create_buffer,
upload_image_data,
upload_numpy,
)
from .pass_helpers import create_linear_sampler, create_sampler_descriptor_pool, load_shader_modules
__all__ = ["TextPass"]
log = logging.getLogger(__name__)
SHADER_DIR = Path(__file__).resolve().parent.parent.parent.parent.parent / "shaders"
# Vertex format: pos(vec2) + uv(vec2) + colour(vec4) = 32 bytes
VERTEX_STRIDE = 32
MAX_CHARS = 4096
VERTEX_BUF_SIZE = MAX_CHARS * 4 * VERTEX_STRIDE # 4 verts per char
INDEX_BUF_SIZE = MAX_CHARS * 6 * 4 # 6 uint32 indices per char
[docs]
class TextPass:
"""GPU text rendering with MSDF atlas, proper CFFI pipeline."""
def __init__(self, engine: Any):
self._engine = engine
self._pipeline: Any = None
self._pipeline_layout: Any = None
self._vert_module: Any = None
self._frag_module: Any = None
self._vertex_buffer: Any = None
self._vertex_memory: Any = None
self._index_buffer: Any = None
self._index_memory: Any = None
self._index_count = 0
self._sampler: Any = None
self._descriptor_layout: Any = None
self._descriptor_pool: Any = None
self._descriptor_set: Any = None
self._atlas_image: Any = None
self._atlas_memory: Any = None
self._atlas_view: Any = None
self._atlas_version = 0 # Tracks which atlas version is on GPU
self._px_range: float = 4.0 # SDF range in atlas pixels
self._ready = False
[docs]
def setup(self) -> None:
"""Initialize GPU resources."""
e = self._engine
device = e.ctx.device
phys = e.ctx.physical_device
# Compile shaders
self._vert_module, self._frag_module = load_shader_modules(
device, SHADER_DIR, "text.vert", "text.frag",
)
# Sampler (linear filtering for MSDF)
self._sampler = create_linear_sampler(device)
# Descriptor layout + pool + set for atlas texture
self._descriptor_layout = create_texture_descriptor_layout(device, max_textures=1)
self._descriptor_pool, desc_sets = create_sampler_descriptor_pool(
device, self._descriptor_layout,
)
self._descriptor_set = desc_sets[0]
# Pipeline (CFFI pattern matching create_ui_pipeline)
self._pipeline, self._pipeline_layout = _create_text_pipeline(
device, self._vert_module, self._frag_module,
e.render_pass, e.extent, self._descriptor_layout,
)
# Vertex/index buffers (host-visible for per-frame updates)
self._vertex_buffer, self._vertex_memory = create_buffer(
device, phys, VERTEX_BUF_SIZE,
vk.VK_BUFFER_USAGE_VERTEX_BUFFER_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
self._index_buffer, self._index_memory = create_buffer(
device, phys, INDEX_BUF_SIZE,
vk.VK_BUFFER_USAGE_INDEX_BUFFER_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
self._ready = True
@property
def pipeline(self) -> Any:
return self._pipeline
@property
def pipeline_layout(self) -> Any:
return self._pipeline_layout
@property
def descriptor_set(self) -> Any:
return self._descriptor_set
@property
def px_range(self) -> float:
return self._px_range
@property
def atlas_version(self) -> int:
return self._atlas_version
[docs]
def upload_atlas_if_dirty(self) -> None:
"""Upload the shared TextRenderer's atlas if its version changed.
Also checks Draw2D's atlas (same shared atlas after unification).
Must be called outside the render pass (staging transfers).
"""
if not self._ready:
return
from ..draw2d import Draw2D
atlas = Draw2D._font
if atlas is None:
return
if atlas.version <= self._atlas_version:
return
self.upload_atlas(atlas.atlas, version=atlas.version, px_range=atlas.sdf_range)
[docs]
def upload_atlas(self, atlas_data: np.ndarray, version: int = 1, px_range: float = 4.0) -> None:
"""Upload MSDF atlas (RGBA uint8) to GPU via staging buffer.
Skips upload if the GPU already has this version. On version change,
destroys old image/view/memory before creating new ones.
"""
self._px_range = px_range
if not self._ready or version <= self._atlas_version:
return
e = self._engine
device = e.ctx.device
# Destroy old atlas resources if re-uploading.
# Must wait for GPU to finish — the old view may still be referenced
# by in-flight command buffers / descriptor sets.
if self._atlas_view:
vk.vkDeviceWaitIdle(device)
vk.vkDestroyImageView(device, self._atlas_view, None)
self._atlas_view = None
if self._atlas_image:
vk.vkDestroyImage(device, self._atlas_image, None)
self._atlas_image = None
if self._atlas_memory:
vk.vkFreeMemory(device, self._atlas_memory, None)
self._atlas_memory = None
h, w = atlas_data.shape[:2]
# Ensure RGBA and contiguous
if atlas_data.ndim == 2:
rgba = np.stack([atlas_data] * 4, axis=-1)
elif atlas_data.shape[2] == 3:
rgba = np.zeros((h, w, 4), dtype=np.uint8)
rgba[:, :, :3] = atlas_data
rgba[:, :, 3] = 255
else:
rgba = atlas_data
rgba = np.ascontiguousarray(rgba)
# Upload via staging buffer
self._atlas_image, self._atlas_memory = upload_image_data(
device, e.ctx.physical_device, e.ctx.graphics_queue, e.ctx.command_pool,
rgba, w, h, vk.VK_FORMAT_R8G8B8A8_UNORM,
)
# Create image view
view_ci = vk.VkImageViewCreateInfo(
image=self._atlas_image,
viewType=vk.VK_IMAGE_VIEW_TYPE_2D,
format=vk.VK_FORMAT_R8G8B8A8_UNORM,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0, levelCount=1,
baseArrayLayer=0, layerCount=1,
),
)
self._atlas_view = vk.vkCreateImageView(device, view_ci, None)
# Bind to descriptor set
write_texture_descriptor(
device, self._descriptor_set,
0, self._atlas_view, self._sampler,
)
self._atlas_version = version
[docs]
def upload_geometry(self, vertices: np.ndarray, indices: np.ndarray) -> None:
"""Upload per-frame text vertex/index data."""
if vertices is None or len(indices) == 0:
self._index_count = 0
return
max_verts = MAX_CHARS * 4
max_idx = MAX_CHARS * 6
if len(vertices) > max_verts:
log.warning("TextPass overflow: %d verts (max %d), truncating", len(vertices), max_verts)
vertices = vertices[:max_verts]
indices = indices[:max_idx]
if len(indices) > max_idx:
indices = indices[:max_idx]
upload_numpy(self._engine.ctx.device, self._vertex_memory, vertices)
upload_numpy(self._engine.ctx.device, self._index_memory, indices)
self._index_count = len(indices)
[docs]
def render(self, cmd: Any, width: int, height: int) -> None:
"""Record text draw commands into the active render pass."""
if not self._ready or self._index_count == 0 or self._atlas_version == 0:
return
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline)
# Bind atlas descriptor
vk.vkCmdBindDescriptorSets(
cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline_layout,
0, 1, [self._descriptor_set], 0, None,
)
# Push screen size + px_range
pc_data = np.array([width, height, self._px_range], dtype=np.float32)
self._engine.push_constants(cmd, self._pipeline_layout, pc_data.tobytes())
# Viewport/scissor
vk_viewport = vk.VkViewport(
x=0.0, y=0.0,
width=float(width), height=float(height),
minDepth=0.0, maxDepth=1.0,
)
vk.vkCmdSetViewport(cmd, 0, 1, [vk_viewport])
scissor = vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=width, height=height),
)
vk.vkCmdSetScissor(cmd, 0, 1, [scissor])
# Bind buffers and draw
vk.vkCmdBindVertexBuffers(cmd, 0, 1, [self._vertex_buffer], [0])
vk.vkCmdBindIndexBuffer(cmd, self._index_buffer, 0, vk.VK_INDEX_TYPE_UINT32)
vk.vkCmdDrawIndexed(cmd, self._index_count, 1, 0, 0, 0)
[docs]
def cleanup(self) -> None:
"""Release all GPU resources."""
if not self._ready:
return
device = self._engine.ctx.device
for obj, fn in [
(self._vertex_buffer, vk.vkDestroyBuffer),
(self._index_buffer, vk.vkDestroyBuffer),
(self._atlas_image, vk.vkDestroyImage),
(self._pipeline, vk.vkDestroyPipeline),
(self._pipeline_layout, vk.vkDestroyPipelineLayout),
(self._vert_module, vk.vkDestroyShaderModule),
(self._frag_module, vk.vkDestroyShaderModule),
(self._sampler, vk.vkDestroySampler),
(self._descriptor_layout, vk.vkDestroyDescriptorSetLayout),
(self._descriptor_pool, vk.vkDestroyDescriptorPool),
]:
if obj:
fn(device, obj, None)
for mem in [self._vertex_memory, self._index_memory, self._atlas_memory]:
if mem:
vk.vkFreeMemory(device, mem, None)
if self._atlas_view:
vk.vkDestroyImageView(device, self._atlas_view, None)
self._ready = False
def _create_text_pipeline(
device: Any,
vert_module: Any,
frag_module: Any,
render_pass: Any,
extent: tuple[int, int],
descriptor_layout: Any,
) -> tuple[Any, Any]:
"""Create text rendering pipeline using raw CFFI.
Vertex format: pos(vec2) + uv(vec2) + colour(vec4) = 32 bytes.
Push constant: screen_size(vec2) + px_range(float) = 12 bytes.
Descriptor set 0: MSDF atlas (combined image sampler).
No depth test, alpha blending, cull none.
"""
ffi = vk.ffi
# Push constant range: vec2 screen_size(8) + float px_range(4) = 12 bytes
push_range = ffi.new("VkPushConstantRange*")
push_range.stageFlags = vk.VK_SHADER_STAGE_VERTEX_BIT | vk.VK_SHADER_STAGE_FRAGMENT_BIT
push_range.offset = 0
push_range.size = 12
# Pipeline layout with descriptor set + push constants
set_layouts = ffi.new("VkDescriptorSetLayout[1]", [descriptor_layout])
layout_ci = ffi.new("VkPipelineLayoutCreateInfo*")
layout_ci.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_LAYOUT_CREATE_INFO
layout_ci.setLayoutCount = 1
layout_ci.pSetLayouts = set_layouts
layout_ci.pushConstantRangeCount = 1
layout_ci.pPushConstantRanges = push_range
layout_out = ffi.new("VkPipelineLayout*")
result = vk._vulkan._callApi(
vk._vulkan.lib.vkCreatePipelineLayout,
device, layout_ci, ffi.NULL, layout_out,
)
if result != vk.VK_SUCCESS:
raise RuntimeError(f"vkCreatePipelineLayout failed: {result}")
pipeline_layout = layout_out[0]
pi = ffi.new("VkGraphicsPipelineCreateInfo*")
pi.sType = vk.VK_STRUCTURE_TYPE_GRAPHICS_PIPELINE_CREATE_INFO
# Shader stages
stages = ffi.new("VkPipelineShaderStageCreateInfo[2]")
main_name = ffi.new("char[]", b"main")
stages[0].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO
stages[0].stage = vk.VK_SHADER_STAGE_VERTEX_BIT
stages[0].module = vert_module
stages[0].pName = main_name
stages[1].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO
stages[1].stage = vk.VK_SHADER_STAGE_FRAGMENT_BIT
stages[1].module = frag_module
stages[1].pName = main_name
pi.stageCount = 2
pi.pStages = stages
# Vertex input: pos(vec2) + uv(vec2) + colour(vec4) = 32 bytes
binding_desc = ffi.new("VkVertexInputBindingDescription*")
binding_desc.binding = 0
binding_desc.stride = VERTEX_STRIDE
binding_desc.inputRate = vk.VK_VERTEX_INPUT_RATE_VERTEX
attr_descs = ffi.new("VkVertexInputAttributeDescription[3]")
attr_descs[0].location = 0
attr_descs[0].binding = 0
attr_descs[0].format = vk.VK_FORMAT_R32G32_SFLOAT
attr_descs[0].offset = 0
attr_descs[1].location = 1
attr_descs[1].binding = 0
attr_descs[1].format = vk.VK_FORMAT_R32G32_SFLOAT
attr_descs[1].offset = 8
attr_descs[2].location = 2
attr_descs[2].binding = 0
attr_descs[2].format = vk.VK_FORMAT_R32G32B32A32_SFLOAT
attr_descs[2].offset = 16
vi = ffi.new("VkPipelineVertexInputStateCreateInfo*")
vi.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VERTEX_INPUT_STATE_CREATE_INFO
vi.vertexBindingDescriptionCount = 1
vi.pVertexBindingDescriptions = binding_desc
vi.vertexAttributeDescriptionCount = 3
vi.pVertexAttributeDescriptions = attr_descs
pi.pVertexInputState = vi
# Input assembly
ia = ffi.new("VkPipelineInputAssemblyStateCreateInfo*")
ia.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_INPUT_ASSEMBLY_STATE_CREATE_INFO
ia.topology = vk.VK_PRIMITIVE_TOPOLOGY_TRIANGLE_LIST
pi.pInputAssemblyState = ia
# Viewport state
vps = ffi.new("VkPipelineViewportStateCreateInfo*")
vps.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VIEWPORT_STATE_CREATE_INFO
vps.viewportCount = 1
viewport = ffi.new("VkViewport*")
viewport.width = float(extent[0])
viewport.height = float(extent[1])
viewport.maxDepth = 1.0
vps.pViewports = viewport
scissor = ffi.new("VkRect2D*")
scissor.extent.width = extent[0]
scissor.extent.height = extent[1]
vps.scissorCount = 1
vps.pScissors = scissor
pi.pViewportState = vps
# Rasterization — no culling for 2D text
rs = ffi.new("VkPipelineRasterizationStateCreateInfo*")
rs.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_RASTERIZATION_STATE_CREATE_INFO
rs.polygonMode = vk.VK_POLYGON_MODE_FILL
rs.lineWidth = 1.0
rs.cullMode = vk.VK_CULL_MODE_NONE
pi.pRasterizationState = rs
# Multisample
ms = ffi.new("VkPipelineMultisampleStateCreateInfo*")
ms.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_MULTISAMPLE_STATE_CREATE_INFO
ms.rasterizationSamples = vk.VK_SAMPLE_COUNT_1_BIT
pi.pMultisampleState = ms
# No depth test for overlay text
dss = ffi.new("VkPipelineDepthStencilStateCreateInfo*")
dss.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DEPTH_STENCIL_STATE_CREATE_INFO
dss.depthTestEnable = 0
dss.depthWriteEnable = 0
pi.pDepthStencilState = dss
# Colour blend — alpha blending
cba = ffi.new("VkPipelineColorBlendAttachmentState*")
cba.blendEnable = 1
cba.srcColorBlendFactor = vk.VK_BLEND_FACTOR_SRC_ALPHA
cba.dstColorBlendFactor = vk.VK_BLEND_FACTOR_ONE_MINUS_SRC_ALPHA
cba.colorBlendOp = vk.VK_BLEND_OP_ADD
cba.srcAlphaBlendFactor = vk.VK_BLEND_FACTOR_ONE
cba.dstAlphaBlendFactor = vk.VK_BLEND_FACTOR_ZERO
cba.alphaBlendOp = vk.VK_BLEND_OP_ADD
cba.colorWriteMask = (
vk.VK_COLOR_COMPONENT_R_BIT | vk.VK_COLOR_COMPONENT_G_BIT
| vk.VK_COLOR_COMPONENT_B_BIT | vk.VK_COLOR_COMPONENT_A_BIT
)
cb = ffi.new("VkPipelineColorBlendStateCreateInfo*")
cb.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_COLOR_BLEND_STATE_CREATE_INFO
cb.attachmentCount = 1
cb.pAttachments = cba
pi.pColorBlendState = cb
# Dynamic state (viewport + scissor)
dyn_states = ffi.new("VkDynamicState[2]", [
vk.VK_DYNAMIC_STATE_VIEWPORT, vk.VK_DYNAMIC_STATE_SCISSOR,
])
ds = ffi.new("VkPipelineDynamicStateCreateInfo*")
ds.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DYNAMIC_STATE_CREATE_INFO
ds.dynamicStateCount = 2
ds.pDynamicStates = dyn_states
pi.pDynamicState = ds
pi.layout = pipeline_layout
pi.renderPass = render_pass
pipeline_out = ffi.new("VkPipeline*")
result = vk._vulkan._callApi(
vk._vulkan.lib.vkCreateGraphicsPipelines,
device, ffi.NULL, 1, pi, ffi.NULL, pipeline_out,
)
if result != vk.VK_SUCCESS:
raise RuntimeError(f"vkCreateGraphicsPipelines failed: {result}")
log.debug("Text pipeline created")
return pipeline_out[0], pipeline_layout