"""SubViewport render-to-texture: offscreen targets for ``core.SubViewport`` nodes.
A :class:`~simvx.core.SubViewport` renders its own subtree (with its own
camera) into an offscreen texture that other nodes in the *main* scene can
sample: a "live monitor in the world", a security-camera feed, a minimap, a
render-to-texture portal, etc.
Architecture mirrors :class:`~simvx.graphics.renderer.game_viewport.GameViewportRenderer`
(the proven editor game-preview path): each SubViewport owns a
:class:`RenderTarget`, registered once as a *bindless texture* whose slot id
stays stable across resizes so consumers that captured ``subviewport.texture``
keep working. :class:`SubViewportManager` keys one renderer per SubViewport by
``id(node)``, creates it on first sight, debounce-resizes on size changes, and
unregisters on exit-tree / teardown.
Frame ordering: all SubViewports are rendered **flat**, before the main scene
pass, in :meth:`SubViewportManager.render_all` (driven from the engine
``pre_render`` callback). Each target finishes in ``SHADER_READ_ONLY`` so the
main pass can sample it the same frame. A SubViewport that samples *another*
SubViewport sees last frame's content (one-frame lag): topological ordering is
out of scope for v1.
"""
from __future__ import annotations
import logging
from typing import Any
import vulkan as vk
from .draw2d_pass import Draw2DPass
from .render_target import RenderTarget
log = logging.getLogger(__name__)
__all__ = ["SubViewportRenderer", "SubViewportManager"]
[docs]
class SubViewportRenderer:
"""One offscreen render target for a single SubViewport node.
Wraps a :class:`RenderTarget` and exposes the ``ready`` / ``width`` /
``height`` / ``begin_pass`` / ``end_pass`` / ``render_draw2d`` surface that
:meth:`SceneAdapter.render_to_target` expects: the same contract
:class:`GameViewportRenderer` satisfies.
"""
def __init__(self, engine: Any) -> None:
self._engine = engine
self._target: RenderTarget | None = None
self._draw2d_pass: Draw2DPass | None = None
self._texture_id: int = -1
self._width: int = 0
self._height: int = 0
# Match the HDR offscreen pass format so the forward renderer's
# pipelines (compiled against the HDR pass) are render-pass compatible.
self._colour_format = vk.VK_FORMAT_R16G16B16A16_SFLOAT
[docs]
def create(self, width: int, height: int) -> None:
"""Create the offscreen target and register its colour view as bindless."""
if width < 1 or height < 1:
return
self._width = width
self._height = height
self._target = RenderTarget(
self._engine.ctx.device,
self._engine.ctx.physical_device,
width, height,
colour_format=self._colour_format,
use_depth=True,
samplable_depth=True,
queue=self._engine.ctx.graphics_queue,
command_pool=self._engine.ctx.command_pool,
)
self._texture_id = self._engine.register_texture(self._target.colour_view)
text_pass = getattr(self._engine.renderer, "_text_pass", None)
self._draw2d_pass = Draw2DPass(self._engine, text_pass=text_pass)
self._draw2d_pass.setup(render_pass=self._target.render_pass, extent=(width, height))
log.debug("SubViewportRenderer created %dx%d, texture_id=%d", width, height, self._texture_id)
[docs]
def resize(self, width: int, height: int) -> None:
"""Recreate the target at a new size, preserving the bindless slot.
The slot id handed out as ``subviewport.texture`` stays stable; only
the backing image view changes, so a Sprite2D / Material that captured
the slot keeps sampling the live feed after a resize.
"""
if (width, height) == (self._width, self._height) or width < 1 or height < 1:
return
old_slot = self._texture_id
vk.vkDeviceWaitIdle(self._engine.ctx.device)
if self._draw2d_pass is not None:
self._draw2d_pass.cleanup()
self._draw2d_pass = None
if self._target is not None:
self._target.destroy()
self._target = None
self._width = width
self._height = height
self._target = RenderTarget(
self._engine.ctx.device,
self._engine.ctx.physical_device,
width, height,
colour_format=self._colour_format,
use_depth=True,
samplable_depth=True,
queue=self._engine.ctx.graphics_queue,
command_pool=self._engine.ctx.command_pool,
)
if old_slot >= 0:
self._engine.update_texture(old_slot, self._target.colour_view)
else:
self._texture_id = self._engine.register_texture(self._target.colour_view)
text_pass = getattr(self._engine.renderer, "_text_pass", None)
self._draw2d_pass = Draw2DPass(self._engine, text_pass=text_pass)
self._draw2d_pass.setup(render_pass=self._target.render_pass, extent=(width, height))
[docs]
def begin_pass(self, cmd: Any) -> None:
"""Begin the offscreen colour+depth pass (clear)."""
rt = self._target
if rt is None:
return
clear_values = [
vk.VkClearValue(color=vk.VkClearColorValue(float32=self._clear_colour)),
vk.VkClearValue(depthStencil=vk.VkClearDepthStencilValue(depth=1.0, stencil=0)),
]
rp_begin = vk.VkRenderPassBeginInfo(
renderPass=rt.render_pass,
framebuffer=rt.framebuffer,
renderArea=vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=rt.width, height=rt.height),
),
clearValueCount=len(clear_values),
pClearValues=clear_values,
)
vk.vkCmdBeginRenderPass(cmd, rp_begin, vk.VK_SUBPASS_CONTENTS_INLINE)
vk.vkCmdSetViewport(cmd, 0, 1, [vk.VkViewport(
x=0.0, y=0.0, width=float(rt.width), height=float(rt.height), minDepth=0.0, maxDepth=1.0,
)])
vk.vkCmdSetScissor(cmd, 0, 1, [vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=rt.width, height=rt.height),
)])
[docs]
def end_pass(self, cmd: Any) -> None:
if self._target is not None:
vk.vkCmdEndRenderPass(cmd)
[docs]
def render_draw2d(self, cmd: Any, ops: list) -> None:
"""Overlay pre-extracted Draw2D ops on top of the 3D content (LOAD_OP_LOAD)."""
rt = self._target
if rt is None or not ops or self._draw2d_pass is None:
return
rp_begin = vk.VkRenderPassBeginInfo(
renderPass=rt.overlay_render_pass,
framebuffer=rt.framebuffer,
renderArea=vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0),
extent=vk.VkExtent2D(width=rt.width, height=rt.height),
),
clearValueCount=0,
pClearValues=None,
)
vk.vkCmdBeginRenderPass(cmd, rp_begin, vk.VK_SUBPASS_CONTENTS_INLINE)
vk.vkCmdSetViewport(cmd, 0, 1, [vk.VkViewport(
x=0.0, y=0.0, width=float(rt.width), height=float(rt.height), minDepth=0.0, maxDepth=1.0,
)])
vk.vkCmdSetScissor(cmd, 0, 1, [vk.VkRect2D(
offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=rt.width, height=rt.height),
)])
self._draw2d_pass.render(cmd, rt.width, rt.height, ops=ops)
vk.vkCmdEndRenderPass(cmd)
[docs]
@property
def texture_id(self) -> int:
return self._texture_id
[docs]
@property
def width(self) -> int:
return self._width
[docs]
@property
def height(self) -> int:
return self._height
[docs]
@property
def ready(self) -> bool:
return self._target is not None and self._texture_id >= 0
# Per-frame clear colour, set by the manager from the SubViewport's
# transparent_bg flag before begin_pass.
_clear_colour: list[float] = [0.0, 0.0, 0.0, 1.0]
[docs]
def destroy(self) -> None:
"""Release the target, Draw2D pass, and the bindless slot."""
if self._target is None and self._draw2d_pass is None:
return
vk.vkDeviceWaitIdle(self._engine.ctx.device)
if self._draw2d_pass is not None:
self._draw2d_pass.cleanup()
self._draw2d_pass = None
if self._target is not None:
self._target.destroy()
self._target = None
if self._texture_id >= 0:
self._engine.unregister_texture(self._texture_id)
self._texture_id = -1
self._width = 0
self._height = 0
class _SubTreeView:
"""Minimal duck-typed SceneTree view over one SubViewport's subtree.
:meth:`SceneAdapter.render_to_target` / :meth:`SceneAdapter.submit_scene`
only read a small, fixed set of ``tree`` attributes. Rather than spin up a
full :class:`SceneTree` (which fires lifecycle hooks, owns input tables,
etc.) per SubViewport, this carries exactly those attributes. ``root`` is
the SubViewport node itself, so ``_collect_nodes`` walks only its children.
"""
__slots__ = (
"root", "_screen_size", "_structure_version", "play_viewport_rect",
"_render_camera_override", "_current_camera_2d", "overlay_offset", "_app",
)
def __init__(self, root: Any, screen_size: tuple[float, float]) -> None:
self.root = root
self._screen_size = screen_size
self._structure_version = -1 # forces a re-walk on first submit
self.play_viewport_rect: tuple[float, float, float, float] | None = None
self._render_camera_override = None
self._current_camera_2d = None
self.overlay_offset = (0.0, 0.0)
self._app = None
@property
def screen_size(self) -> tuple[float, float]:
return self._screen_size
[docs]
class SubViewportManager:
"""Owns one :class:`SubViewportRenderer` per live SubViewport node.
Created once per :class:`App` run and invoked from the engine ``pre_render``
callback via :meth:`render_all`. Discovers SubViewports each frame by
walking the main scene tree, lazily creating a target on first sight,
debounce-resizing when ``SubViewport.size`` changes, and reaping targets
whose nodes have left the tree.
"""
# Resize debounce: only resize once the requested size has been stable for
# this many consecutive frames (mirrors PlayMode.ensure_game_viewport_size).
_RESIZE_DEBOUNCE = 6
def __init__(self, engine: Any, adapter: Any) -> None:
self._engine = engine
self._adapter = adapter
# id(node) -> renderer / view / debounce bookkeeping
self._renderers: dict[int, SubViewportRenderer] = {}
self._views: dict[int, _SubTreeView] = {}
self._nodes: dict[int, Any] = {} # keep a ref so id() stays unique while live
self._pending_size: dict[int, tuple[int, int]] = {}
self._stable_frames: dict[int, int] = {}
# Monotonic stamp handed to each view's _structure_version before a
# submit. The shared SceneAdapter caches its collected-node list by
# (tree, _structure_version); without a unique stamp per view, two
# views (both starting at -1) or a view and the main tree could collide
# and one would render another tree's geometry. A fresh value each
# render forces a correct re-walk per view.
self._version_counter: int = 1_000_000
[docs]
def render_all(self, tree: Any) -> bool:
"""Render every SubViewport in *tree* into its offscreen target.
Called from the engine ``pre_render`` callback, before the main scene
pass. Each SubViewport renders its own subtree with its own camera.
Each SubViewport renders on its **own dedicated one-time command
buffer**, submitted and *waited on* before the next viewport (and before
the main frame's command buffer). This is mandatory: the renderer keeps
a single per-frame transform / material SSBO that ``render_to_target``
overwrites. If two viewports (or a viewport and the main scene) shared
one command buffer (executed together at submit), the last SSBO upload
would win and earlier passes would render with the wrong transforms.
Fully completing each offscreen pass first leaves its target in
``SHADER_READ_ONLY`` and frees the SSBO for the next pass to reuse.
Returns ``True`` if at least one SubViewport rendered this frame: the
caller must then re-submit the main scene, since ``render_to_target``
cleared the renderer's per-frame submission lists.
A SubViewport that samples *another* SubViewport sees the previous
frame's content (one-frame lag); v1 does not topologically order them.
"""
if tree is None or tree.root is None:
return False
# Lazy import keeps this module import-light and avoids a hard core dep
# at module load (the type is only needed for isinstance discovery).
from simvx.core import SubViewport
from ..draw2d import Draw2D
from ..gpu.memory import begin_single_time_commands, end_single_time_commands
live = self._collect_subviewports(tree.root, SubViewport)
self._reap(set(map(id, live)))
if not live:
return False
ctx = self._engine.ctx
rendered_any = False
for node in live:
key = id(node)
w, h = int(node.size[0]), int(node.size[1])
if w < 1 or h < 1:
continue
rend = self._ensure_renderer(key, node, w, h)
if rend is None or not rend.ready:
continue
self._debounce_resize(key, rend, node, w, h)
# Honour update_mode: "always" (default) renders every frame;
# "once" renders a single frame then freezes; "disabled" never
# renders (slot stays valid but stale).
mode = getattr(node, "render_target_update_mode", "always")
if mode == "disabled":
node._texture_id = rend.texture_id
continue
if mode == "once" and getattr(node, "_svp_rendered_once", False):
node._texture_id = rend.texture_id
continue
view = self._views[key]
self._version_counter += 1
view._structure_version = self._version_counter
view._screen_size = (float(rend.width), float(rend.height))
view._current_camera_2d = self._find_camera_2d(node)
camera_3d = self._find_camera_3d(node)
rend._clear_colour = (
[0.0, 0.0, 0.0, 0.0] if getattr(node, "transparent_bg", False)
else [0.0, 0.0, 0.0, 1.0]
)
# Extract the subtree's own Draw2D overlay in isolation so it
# doesn't leak into the main scene's Draw2D op list.
sub_ops: list = []
with Draw2D._isolated():
node._draw_recursive(Draw2D)
sub_ops = list(Draw2D._ops)
# Each SubViewport renders + submits + waits on its OWN one-time
# command buffer. This is required (not just for the main pass):
# the renderer's single per-frame transform / material SSBO is
# rewritten by every render_to_target, so two SubViewports sharing
# one command buffer would have the second's SSBO upload corrupt the
# first's already-recorded (but not-yet-executed) draws.
cmd = begin_single_time_commands(ctx.device, ctx.command_pool)
self._adapter.render_to_target(
cmd, rend, view,
camera=camera_3d,
screen_size=(float(rend.width), float(rend.height)),
draw2d_ops=sub_ops or None,
)
end_single_time_commands(ctx.device, ctx.graphics_queue, ctx.command_pool, cmd)
node._texture_id = rend.texture_id
node._svp_rendered_once = True
rendered_any = True
return rendered_any
def _ensure_renderer(self, key: int, node: Any, w: int, h: int) -> SubViewportRenderer | None:
rend = self._renderers.get(key)
if rend is not None:
return rend
rend = SubViewportRenderer(self._engine)
rend.create(w, h)
if not rend.ready:
rend.destroy()
return None
self._renderers[key] = rend
self._views[key] = _SubTreeView(node, (float(w), float(h)))
self._nodes[key] = node
self._pending_size[key] = (w, h)
self._stable_frames[key] = 0
node._texture_id = rend.texture_id
return rend
def _debounce_resize(self, key: int, rend: SubViewportRenderer, node: Any, w: int, h: int) -> None:
target = (max(w, 1), max(h, 1))
if (rend.width, rend.height) == target:
self._pending_size[key] = target
self._stable_frames[key] = 0
return
if target != self._pending_size.get(key):
self._pending_size[key] = target
self._stable_frames[key] = 0
else:
self._stable_frames[key] += 1
if self._stable_frames[key] >= self._RESIZE_DEBOUNCE:
rend.resize(*target)
# Slot is stable across resize, but re-publish defensively.
node._texture_id = rend.texture_id
self._stable_frames[key] = 0
def _reap(self, live_keys: set[int]) -> None:
"""Destroy renderers whose SubViewport has left the tree."""
for key in [k for k in self._renderers if k not in live_keys]:
self._renderers.pop(key).destroy()
self._views.pop(key, None)
self._nodes.pop(key, None)
self._pending_size.pop(key, None)
self._stable_frames.pop(key, None)
@staticmethod
def _collect_subviewports(root: Any, sub_viewport_type: type) -> list:
"""Find all SubViewports under *root*, but do NOT descend into them.
A SubViewport's children belong to its own offscreen subtree, never the
main pass, so we stop the walk at each SubViewport.
"""
found: list = []
stack = [root]
while stack:
node = stack.pop()
if node is not root and isinstance(node, sub_viewport_type):
found.append(node)
continue # do not descend: children render offscreen
stack.extend(node.children)
# Root itself may be a SubViewport (unusual but valid).
if root is not None and isinstance(root, sub_viewport_type) and root not in found:
found.append(root)
return found
@staticmethod
def _find_camera_3d(node: Any) -> Any:
from simvx.core import Camera3D
for child in node.children:
cam = SubViewportManager._first_of_type(child, Camera3D)
if cam is not None:
return cam
return None
@staticmethod
def _find_camera_2d(node: Any) -> Any:
from simvx.core import Camera2D
for child in node.children:
cam = SubViewportManager._first_of_type(child, Camera2D)
if cam is not None:
return cam
return None
@staticmethod
def _first_of_type(node: Any, typ: type) -> Any:
stack = [node]
while stack:
n = stack.pop()
if isinstance(n, typ) and getattr(n, "_visible_in_hierarchy", True):
return n
stack.extend(n.children)
return None
[docs]
def destroy(self) -> None:
"""Tear down all targets (call on app shutdown)."""
for rend in self._renderers.values():
rend.destroy()
self._renderers.clear()
self._views.clear()
self._nodes.clear()
self._pending_size.clear()
self._stable_frames.clear()