Source code for simvx.graphics.renderer.text_pass

"""2D text overlay pass using MSDF atlas — renders after 3D geometry."""

from __future__ import annotations

import logging
from pathlib import Path
from typing import Any

import numpy as np
import vulkan as vk

from ..gpu.descriptors import (
    create_texture_descriptor_layout,
    write_texture_descriptor,
)
from ..gpu.memory import (
    create_buffer,
    upload_image_data,
    upload_numpy,
)
from .pass_helpers import create_linear_sampler, create_sampler_descriptor_pool, load_shader_modules

__all__ = ["TextPass"]

log = logging.getLogger(__name__)

SHADER_DIR = Path(__file__).resolve().parent.parent.parent.parent.parent / "shaders"

# Vertex format: pos(vec2) + uv(vec2) + colour(vec4) = 32 bytes
VERTEX_STRIDE = 32
MAX_CHARS = 4096
VERTEX_BUF_SIZE = MAX_CHARS * 4 * VERTEX_STRIDE  # 4 verts per char
INDEX_BUF_SIZE = MAX_CHARS * 6 * 4               # 6 uint32 indices per char


[docs] class TextPass: """GPU text rendering with MSDF atlas, proper CFFI pipeline.""" def __init__(self, engine: Any): self._engine = engine self._pipeline: Any = None self._pipeline_layout: Any = None self._vert_module: Any = None self._frag_module: Any = None self._vertex_buffer: Any = None self._vertex_memory: Any = None self._index_buffer: Any = None self._index_memory: Any = None self._index_count = 0 self._sampler: Any = None self._descriptor_layout: Any = None self._descriptor_pool: Any = None self._descriptor_set: Any = None self._atlas_image: Any = None self._atlas_memory: Any = None self._atlas_view: Any = None self._atlas_version = 0 # Tracks which atlas version is on GPU self._px_range: float = 4.0 # SDF range in atlas pixels self._ready = False
[docs] def setup(self) -> None: """Initialize GPU resources.""" e = self._engine device = e.ctx.device phys = e.ctx.physical_device # Compile shaders self._vert_module, self._frag_module = load_shader_modules( device, SHADER_DIR, "text.vert", "text.frag", ) # Sampler (linear filtering for MSDF) self._sampler = create_linear_sampler(device) # Descriptor layout + pool + set for atlas texture self._descriptor_layout = create_texture_descriptor_layout(device, max_textures=1) self._descriptor_pool, desc_sets = create_sampler_descriptor_pool( device, self._descriptor_layout, ) self._descriptor_set = desc_sets[0] # Pipeline (CFFI pattern matching create_ui_pipeline) self._pipeline, self._pipeline_layout = _create_text_pipeline( device, self._vert_module, self._frag_module, e.render_pass, e.extent, self._descriptor_layout, ) # Vertex/index buffers (host-visible for per-frame updates) self._vertex_buffer, self._vertex_memory = create_buffer( device, phys, VERTEX_BUF_SIZE, vk.VK_BUFFER_USAGE_VERTEX_BUFFER_BIT, vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT, ) self._index_buffer, self._index_memory = create_buffer( device, phys, INDEX_BUF_SIZE, vk.VK_BUFFER_USAGE_INDEX_BUFFER_BIT, vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT, ) self._ready = True
@property def pipeline(self) -> Any: return self._pipeline @property def pipeline_layout(self) -> Any: return self._pipeline_layout @property def descriptor_set(self) -> Any: return self._descriptor_set @property def px_range(self) -> float: return self._px_range @property def atlas_version(self) -> int: return self._atlas_version
[docs] def upload_atlas_if_dirty(self) -> None: """Upload the shared TextRenderer's atlas if its version changed. Also checks Draw2D's atlas (same shared atlas after unification). Must be called outside the render pass (staging transfers). """ if not self._ready: return from ..draw2d import Draw2D atlas = Draw2D._font if atlas is None: return if atlas.version <= self._atlas_version: return self.upload_atlas(atlas.atlas, version=atlas.version, px_range=atlas.sdf_range)
[docs] def upload_atlas(self, atlas_data: np.ndarray, version: int = 1, px_range: float = 4.0) -> None: """Upload MSDF atlas (RGBA uint8) to GPU via staging buffer. Skips upload if the GPU already has this version. On version change, destroys old image/view/memory before creating new ones. """ self._px_range = px_range if not self._ready or version <= self._atlas_version: return e = self._engine device = e.ctx.device # Destroy old atlas resources if re-uploading. # Must wait for GPU to finish — the old view may still be referenced # by in-flight command buffers / descriptor sets. if self._atlas_view: vk.vkDeviceWaitIdle(device) vk.vkDestroyImageView(device, self._atlas_view, None) self._atlas_view = None if self._atlas_image: vk.vkDestroyImage(device, self._atlas_image, None) self._atlas_image = None if self._atlas_memory: vk.vkFreeMemory(device, self._atlas_memory, None) self._atlas_memory = None h, w = atlas_data.shape[:2] # Ensure RGBA and contiguous if atlas_data.ndim == 2: rgba = np.stack([atlas_data] * 4, axis=-1) elif atlas_data.shape[2] == 3: rgba = np.zeros((h, w, 4), dtype=np.uint8) rgba[:, :, :3] = atlas_data rgba[:, :, 3] = 255 else: rgba = atlas_data rgba = np.ascontiguousarray(rgba) # Upload via staging buffer self._atlas_image, self._atlas_memory = upload_image_data( device, e.ctx.physical_device, e.ctx.graphics_queue, e.ctx.command_pool, rgba, w, h, vk.VK_FORMAT_R8G8B8A8_UNORM, ) # Create image view view_ci = vk.VkImageViewCreateInfo( image=self._atlas_image, viewType=vk.VK_IMAGE_VIEW_TYPE_2D, format=vk.VK_FORMAT_R8G8B8A8_UNORM, subresourceRange=vk.VkImageSubresourceRange( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=1, ), ) self._atlas_view = vk.vkCreateImageView(device, view_ci, None) # Bind to descriptor set write_texture_descriptor( device, self._descriptor_set, 0, self._atlas_view, self._sampler, ) self._atlas_version = version
[docs] def upload_geometry(self, vertices: np.ndarray, indices: np.ndarray) -> None: """Upload per-frame text vertex/index data.""" if vertices is None or len(indices) == 0: self._index_count = 0 return max_verts = MAX_CHARS * 4 max_idx = MAX_CHARS * 6 if len(vertices) > max_verts: log.warning("TextPass overflow: %d verts (max %d), truncating", len(vertices), max_verts) vertices = vertices[:max_verts] indices = indices[:max_idx] if len(indices) > max_idx: indices = indices[:max_idx] upload_numpy(self._engine.ctx.device, self._vertex_memory, vertices) upload_numpy(self._engine.ctx.device, self._index_memory, indices) self._index_count = len(indices)
[docs] def render(self, cmd: Any, width: int, height: int) -> None: """Record text draw commands into the active render pass.""" if not self._ready or self._index_count == 0 or self._atlas_version == 0: return vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline) # Bind atlas descriptor vk.vkCmdBindDescriptorSets( cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, self._pipeline_layout, 0, 1, [self._descriptor_set], 0, None, ) # Push screen size + px_range pc_data = np.array([width, height, self._px_range], dtype=np.float32) self._engine.push_constants(cmd, self._pipeline_layout, pc_data.tobytes()) # Viewport/scissor vk_viewport = vk.VkViewport( x=0.0, y=0.0, width=float(width), height=float(height), minDepth=0.0, maxDepth=1.0, ) vk.vkCmdSetViewport(cmd, 0, 1, [vk_viewport]) scissor = vk.VkRect2D( offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=width, height=height), ) vk.vkCmdSetScissor(cmd, 0, 1, [scissor]) # Bind buffers and draw vk.vkCmdBindVertexBuffers(cmd, 0, 1, [self._vertex_buffer], [0]) vk.vkCmdBindIndexBuffer(cmd, self._index_buffer, 0, vk.VK_INDEX_TYPE_UINT32) vk.vkCmdDrawIndexed(cmd, self._index_count, 1, 0, 0, 0)
[docs] def cleanup(self) -> None: """Release all GPU resources.""" if not self._ready: return device = self._engine.ctx.device for obj, fn in [ (self._vertex_buffer, vk.vkDestroyBuffer), (self._index_buffer, vk.vkDestroyBuffer), (self._atlas_image, vk.vkDestroyImage), (self._pipeline, vk.vkDestroyPipeline), (self._pipeline_layout, vk.vkDestroyPipelineLayout), (self._vert_module, vk.vkDestroyShaderModule), (self._frag_module, vk.vkDestroyShaderModule), (self._sampler, vk.vkDestroySampler), (self._descriptor_layout, vk.vkDestroyDescriptorSetLayout), (self._descriptor_pool, vk.vkDestroyDescriptorPool), ]: if obj: fn(device, obj, None) for mem in [self._vertex_memory, self._index_memory, self._atlas_memory]: if mem: vk.vkFreeMemory(device, mem, None) if self._atlas_view: vk.vkDestroyImageView(device, self._atlas_view, None) self._ready = False
def _create_text_pipeline( device: Any, vert_module: Any, frag_module: Any, render_pass: Any, extent: tuple[int, int], descriptor_layout: Any, ) -> tuple[Any, Any]: """Create text rendering pipeline using raw CFFI. Vertex format: pos(vec2) + uv(vec2) + colour(vec4) = 32 bytes. Push constant: screen_size(vec2) + px_range(float) = 12 bytes. Descriptor set 0: MSDF atlas (combined image sampler). No depth test, alpha blending, cull none. """ ffi = vk.ffi # Push constant range: vec2 screen_size(8) + float px_range(4) = 12 bytes push_range = ffi.new("VkPushConstantRange*") push_range.stageFlags = vk.VK_SHADER_STAGE_VERTEX_BIT | vk.VK_SHADER_STAGE_FRAGMENT_BIT push_range.offset = 0 push_range.size = 12 # Pipeline layout with descriptor set + push constants set_layouts = ffi.new("VkDescriptorSetLayout[1]", [descriptor_layout]) layout_ci = ffi.new("VkPipelineLayoutCreateInfo*") layout_ci.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_LAYOUT_CREATE_INFO layout_ci.setLayoutCount = 1 layout_ci.pSetLayouts = set_layouts layout_ci.pushConstantRangeCount = 1 layout_ci.pPushConstantRanges = push_range layout_out = ffi.new("VkPipelineLayout*") result = vk._vulkan._callApi( vk._vulkan.lib.vkCreatePipelineLayout, device, layout_ci, ffi.NULL, layout_out, ) if result != vk.VK_SUCCESS: raise RuntimeError(f"vkCreatePipelineLayout failed: {result}") pipeline_layout = layout_out[0] pi = ffi.new("VkGraphicsPipelineCreateInfo*") pi.sType = vk.VK_STRUCTURE_TYPE_GRAPHICS_PIPELINE_CREATE_INFO # Shader stages stages = ffi.new("VkPipelineShaderStageCreateInfo[2]") main_name = ffi.new("char[]", b"main") stages[0].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO stages[0].stage = vk.VK_SHADER_STAGE_VERTEX_BIT stages[0].module = vert_module stages[0].pName = main_name stages[1].sType = vk.VK_STRUCTURE_TYPE_PIPELINE_SHADER_STAGE_CREATE_INFO stages[1].stage = vk.VK_SHADER_STAGE_FRAGMENT_BIT stages[1].module = frag_module stages[1].pName = main_name pi.stageCount = 2 pi.pStages = stages # Vertex input: pos(vec2) + uv(vec2) + colour(vec4) = 32 bytes binding_desc = ffi.new("VkVertexInputBindingDescription*") binding_desc.binding = 0 binding_desc.stride = VERTEX_STRIDE binding_desc.inputRate = vk.VK_VERTEX_INPUT_RATE_VERTEX attr_descs = ffi.new("VkVertexInputAttributeDescription[3]") attr_descs[0].location = 0 attr_descs[0].binding = 0 attr_descs[0].format = vk.VK_FORMAT_R32G32_SFLOAT attr_descs[0].offset = 0 attr_descs[1].location = 1 attr_descs[1].binding = 0 attr_descs[1].format = vk.VK_FORMAT_R32G32_SFLOAT attr_descs[1].offset = 8 attr_descs[2].location = 2 attr_descs[2].binding = 0 attr_descs[2].format = vk.VK_FORMAT_R32G32B32A32_SFLOAT attr_descs[2].offset = 16 vi = ffi.new("VkPipelineVertexInputStateCreateInfo*") vi.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VERTEX_INPUT_STATE_CREATE_INFO vi.vertexBindingDescriptionCount = 1 vi.pVertexBindingDescriptions = binding_desc vi.vertexAttributeDescriptionCount = 3 vi.pVertexAttributeDescriptions = attr_descs pi.pVertexInputState = vi # Input assembly ia = ffi.new("VkPipelineInputAssemblyStateCreateInfo*") ia.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_INPUT_ASSEMBLY_STATE_CREATE_INFO ia.topology = vk.VK_PRIMITIVE_TOPOLOGY_TRIANGLE_LIST pi.pInputAssemblyState = ia # Viewport state vps = ffi.new("VkPipelineViewportStateCreateInfo*") vps.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_VIEWPORT_STATE_CREATE_INFO vps.viewportCount = 1 viewport = ffi.new("VkViewport*") viewport.width = float(extent[0]) viewport.height = float(extent[1]) viewport.maxDepth = 1.0 vps.pViewports = viewport scissor = ffi.new("VkRect2D*") scissor.extent.width = extent[0] scissor.extent.height = extent[1] vps.scissorCount = 1 vps.pScissors = scissor pi.pViewportState = vps # Rasterization — no culling for 2D text rs = ffi.new("VkPipelineRasterizationStateCreateInfo*") rs.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_RASTERIZATION_STATE_CREATE_INFO rs.polygonMode = vk.VK_POLYGON_MODE_FILL rs.lineWidth = 1.0 rs.cullMode = vk.VK_CULL_MODE_NONE pi.pRasterizationState = rs # Multisample ms = ffi.new("VkPipelineMultisampleStateCreateInfo*") ms.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_MULTISAMPLE_STATE_CREATE_INFO ms.rasterizationSamples = vk.VK_SAMPLE_COUNT_1_BIT pi.pMultisampleState = ms # No depth test for overlay text dss = ffi.new("VkPipelineDepthStencilStateCreateInfo*") dss.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DEPTH_STENCIL_STATE_CREATE_INFO dss.depthTestEnable = 0 dss.depthWriteEnable = 0 pi.pDepthStencilState = dss # Colour blend — alpha blending cba = ffi.new("VkPipelineColorBlendAttachmentState*") cba.blendEnable = 1 cba.srcColorBlendFactor = vk.VK_BLEND_FACTOR_SRC_ALPHA cba.dstColorBlendFactor = vk.VK_BLEND_FACTOR_ONE_MINUS_SRC_ALPHA cba.colorBlendOp = vk.VK_BLEND_OP_ADD cba.srcAlphaBlendFactor = vk.VK_BLEND_FACTOR_ONE cba.dstAlphaBlendFactor = vk.VK_BLEND_FACTOR_ZERO cba.alphaBlendOp = vk.VK_BLEND_OP_ADD cba.colorWriteMask = ( vk.VK_COLOR_COMPONENT_R_BIT | vk.VK_COLOR_COMPONENT_G_BIT | vk.VK_COLOR_COMPONENT_B_BIT | vk.VK_COLOR_COMPONENT_A_BIT ) cb = ffi.new("VkPipelineColorBlendStateCreateInfo*") cb.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_COLOR_BLEND_STATE_CREATE_INFO cb.attachmentCount = 1 cb.pAttachments = cba pi.pColorBlendState = cb # Dynamic state (viewport + scissor) dyn_states = ffi.new("VkDynamicState[2]", [ vk.VK_DYNAMIC_STATE_VIEWPORT, vk.VK_DYNAMIC_STATE_SCISSOR, ]) ds = ffi.new("VkPipelineDynamicStateCreateInfo*") ds.sType = vk.VK_STRUCTURE_TYPE_PIPELINE_DYNAMIC_STATE_CREATE_INFO ds.dynamicStateCount = 2 ds.pDynamicStates = dyn_states pi.pDynamicState = ds pi.layout = pipeline_layout pi.renderPass = render_pass pipeline_out = ffi.new("VkPipeline*") result = vk._vulkan._callApi( vk._vulkan.lib.vkCreateGraphicsPipelines, device, ffi.NULL, 1, pi, ffi.NULL, pipeline_out, ) if result != vk.VK_SUCCESS: raise RuntimeError(f"vkCreateGraphicsPipelines failed: {result}") log.debug("Text pipeline created") return pipeline_out[0], pipeline_layout