"""Overlay rendering — debug lines, text, and particles."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
import numpy as np
import vulkan as vk
from ..gpu.memory import create_buffer, upload_numpy
from ..materials.shader_compiler import compile_shader
if TYPE_CHECKING:
from .forward import ForwardRenderer
__all__ = ["OverlayRenderer"]
log = logging.getLogger(__name__)
[docs]
class OverlayRenderer:
"""Handles rendering of overlays: debug lines, text, and particles."""
def __init__(self, renderer: ForwardRenderer) -> None:
self._r = renderer
[docs]
def dispatch_gpu_particles(self, cmd: Any) -> None:
"""Dispatch compute shader for GPU particle simulation (outside render pass)."""
r = self._r
if not r._gpu_particle_submissions:
return
# Lazy-init the compute pipeline on first use
if r._particle_compute is None:
from .particle_compute import ParticleCompute
r._particle_compute = ParticleCompute(r._engine)
# Use max amount across all submitted emitters
max_p = max(cfg.get("max_particles", 1024) for cfg in r._gpu_particle_submissions)
r._particle_compute.setup(max_p)
dt = getattr(r._engine, '_last_dt', 1.0 / 60.0)
for cfg in r._gpu_particle_submissions:
r._particle_compute.dispatch(cmd, dt, cfg)
[docs]
def render_particles(self, cmd: Any, extent: tuple[int, int]) -> None:
"""Render all submitted particle systems."""
r = self._r
viewports = r.viewport_manager.get_all()
if not viewports:
return
_, viewport = viewports[0]
vp = viewport.camera_proj @ viewport.camera_view
# Extract camera right/up from view matrix (rows 0 and 1 of transpose)
view_inv = np.linalg.inv(viewport.camera_view)
camera_right = view_inv[:3, 0].astype(np.float32)
camera_up = view_inv[:3, 1].astype(np.float32)
for data, _ in r._particle_submissions:
r._particle_pass.render(cmd, data, vp, camera_right, camera_up, extent)
[docs]
def render_debug_lines(self, cmd: Any, extent: tuple[int, int]) -> None:
"""Render debug wireframe lines if any were submitted."""
r = self._r
from ..debug_draw import DebugDraw
vertex_data = DebugDraw.get_vertex_data()
if vertex_data is None:
return
e = r._engine
device = e.ctx.device
# Lazy-init debug line pipeline
if r._debug_pipeline is None:
from ..gpu.pipeline import create_line_pipeline, create_shader_module
shader_dir = e.shader_dir
vert_spv = compile_shader(shader_dir / "line.vert")
frag_spv = compile_shader(shader_dir / "line.frag")
r._debug_vert_module = create_shader_module(device, vert_spv)
r._debug_frag_module = create_shader_module(device, frag_spv)
r._debug_pipeline, r._debug_pipeline_layout = create_line_pipeline(
device,
r._debug_vert_module,
r._debug_frag_module,
e.render_pass,
extent,
)
# Ensure vertex buffer is large enough
needed = vertex_data.nbytes
if needed > r._debug_vb_capacity:
if r._debug_vb:
vk.vkDestroyBuffer(device, r._debug_vb, None)
vk.vkFreeMemory(device, r._debug_vb_mem, None)
new_cap = max(needed, 4096)
r._debug_vb, r._debug_vb_mem = create_buffer(
device,
e.ctx.physical_device,
new_cap,
vk.VK_BUFFER_USAGE_VERTEX_BUFFER_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
r._debug_vb_capacity = new_cap
upload_numpy(device, r._debug_vb_mem, vertex_data)
# Use the same view/proj from the first viewport
viewports = r.viewport_manager.get_all()
if not viewports:
DebugDraw._clear()
return
_, viewport = viewports[0]
view_transposed = np.ascontiguousarray(viewport.camera_view.T)
proj_transposed = np.ascontiguousarray(viewport.camera_proj.T)
pc_data = view_transposed.tobytes() + proj_transposed.tobytes()
# Record draw commands
vk.vkCmdBindPipeline(cmd, vk.VK_PIPELINE_BIND_POINT_GRAPHICS, r._debug_pipeline)
vk_viewport = vk.VkViewport(
x=0.0,
y=0.0,
width=float(extent[0]),
height=float(extent[1]),
minDepth=0.0,
maxDepth=1.0,
)
vk.vkCmdSetViewport(cmd, 0, 1, [vk_viewport])
scissor = vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=extent[0], height=extent[1]),
)
vk.vkCmdSetScissor(cmd, 0, 1, [scissor])
e.push_constants(cmd, r._debug_pipeline_layout, pc_data)
vk.vkCmdBindVertexBuffers(cmd, 0, 1, [r._debug_vb], [0])
vk.vkCmdDraw(cmd, DebugDraw.vertex_count(), 1, 0, 0)
DebugDraw._clear()
[docs]
def render_text(self, cmd: Any, extent: tuple[int, int]) -> None:
"""Render text overlay using cached TextRenderer and TextPass."""
r = self._r
tr = r._text_renderer
tp = r._text_pass
if not tp or not tr or not tr.has_text:
return
try:
# Atlas upload already handled by TextPass.upload_atlas_if_dirty() in pre_render.
# For overlay text from TextRenderer, ensure its atlas is also uploaded.
for atlas in tr._atlases.values():
tp.upload_atlas(atlas.atlas, version=atlas.version, px_range=atlas.sdf_range)
break # Only one atlas supported for now
# Upload per-frame geometry
vertices = tr.get_vertices()
indices = tr.get_indices()
tp.upload_geometry(vertices, indices)
# Record draw commands
tp.render(cmd, extent[0], extent[1])
except (RuntimeError, ValueError, OSError, KeyError) as e:
log.warning("Text rendering failed: %s", e)