Source code for simvx.core._subviewport_order

"""Backend-agnostic per-frame ordering of SubViewport render targets.

A :class:`~simvx.core.SubViewport` renders its subtree into an offscreen
bindless texture each frame. When SubViewport *A* samples SubViewport *B*'s
texture (``material.albedo_tex_index = B.texture`` or ``sprite._texture_id =
B.texture``), *B* is a **producer** and *A* a **consumer**: *B* must render
first so *A* sees fresh content the same frame instead of last frame's (the
1-frame lag that flat discovery order causes).

:func:`order_subviewports` is the single canonical implementation both the
desktop (Vulkan) and web (WebGPU) backends call. It:

1. Builds ``{slot -> producer}`` from each live SubViewport's current bindless
   slot (``node._texture_id``).
2. Scans each SubViewport's own subtree (stopping at nested SubViewport
   boundaries) for any material / Sprite-like sampler slot equal to *another*
   live SubViewport's slot, and unions the explicit ``node.feeds_from`` hints,
   to build producer -> consumer edges.
3. Kahn topo-sorts so producers precede consumers (deterministic tiebreak by
   discovery index).
4. On a genuine cycle (e.g. two mirrors facing each other) runs Tarjan SCC and
   breaks the minimal back-edge(s) deterministically, so only the cyclic
   boundary lags (the broken consumer reads last frame's texture, the natural
   result of rendering it before its producer). It never raises.

**First-frame warmup.** Before the backend has assigned slots, every
``slot_of(node)`` returns ``-1``; no edges form, so the order is the flat
discovery order. That is correct: there is nothing to sample yet. From the
second frame on (slots assigned) the topological order takes effect, so a
producer -> consumer chain is lag-free from frame 1 of actually sampling.

The function is dependency-light: it imports nothing from ``simvx.graphics``
and inspects only core data (``Material``, Sprite-like ``_texture_id``).
"""

from __future__ import annotations

import logging
from collections.abc import Callable, Iterable
from typing import Any

log = logging.getLogger(__name__)

__all__ = ["order_subviewports", "scan_consumed_slots"]


def _material_slots(material: Any) -> Iterable[int]:
    """Yield every bindless sampler slot a single material references.

    ``Material`` keeps texture *sources* (paths / bytes / ndarrays) in its
    ``*_uri`` fields, which never hold a bindless slot. The one field that
    carries a live bindless index is ``albedo_tex_index`` (set directly by a
    backend or by a consumer binding ``svp.texture``). Scanning only it is
    correct and unambiguous.
    """
    slot = getattr(material, "albedo_tex_index", None)
    if isinstance(slot, int):
        yield slot


[docs] def scan_consumed_slots(viewport: Any) -> set[int]: """Collect every bindless slot consumed inside *viewport*'s subtree. Walks the SubViewport's children in DFS, **stopping at nested SubViewport boundaries** (a nested SubViewport's own consumption is its own edge, scanned when that nested node is processed from the live list, so each edge is counted once). For every node it reads: * ``node.material.albedo_tex_index`` (``MeshInstance3D`` and friends); * ``node._texture_id`` (Sprite2D / Sprite3D / MeshInstance2D / NinePatch / AnimatedSprite2D, whose drawn texture slot lives here). Returns the set of slot integers found (negatives included; the caller filters them against the live-producer map). Excludes *viewport*'s own published slot, which it does not consume. """ slots: set[int] = set() stack = list(getattr(viewport, "children", ()) or ()) while stack: cur = stack.pop() if _is_subviewport(cur): # Do not descend into a nested SubViewport. continue mat = getattr(cur, "material", None) if mat is not None: slots.update(_material_slots(mat)) tid = getattr(cur, "_texture_id", None) if isinstance(tid, int): slots.add(tid) stack.extend(getattr(cur, "children", ()) or ()) return slots
def _is_subviewport(node: Any) -> bool: """True if *node* is a SubViewport, by MRO name (no import needed).""" return any(c.__name__ == "SubViewport" for c in type(node).__mro__) def _normalise_feeds_from(node: Any) -> list[Any]: """Return the explicit producer list from ``node.feeds_from``. Accepts a single SubViewport, an iterable of them, or ``None``/empty. """ feeds = getattr(node, "feeds_from", None) if feeds is None: return [] if _is_subviewport(feeds): return [feeds] try: return [f for f in feeds if f is not None] except TypeError: return []
[docs] def order_subviewports( live: list[Any], slot_of: Callable[[Any], int], *, depth_cap: int = 1, ) -> tuple[list[Any], set[tuple[Any, Any]]]: """Order *live* SubViewports so producers render before consumers. Args: live: SubViewport nodes in discovery order (the flat DFS order the backend already collects). slot_of: ``node -> int`` returning the node's current bindless slot (``node._texture_id``); ``< 0`` means "no slot yet" (first frame). depth_cap: Bound on recursive scene-feedback depth (the ``WorldEnvironment.subviewport_feedback_depth`` property). The default ``1`` renders each SubViewport once per frame; cyclic back-edges then lag by one frame. ``depth_cap <= 0`` is treated as ``1`` (each node still renders once). Higher caps are reserved for a future multi-pass feedback expansion and currently behave like 1 for the ordering itself (each node appears once); the cap is honoured in that no node is scheduled more times than the cap allows. Returns: ``(ordered, lagged_edges)`` where ``ordered`` is the list of the same SubViewport nodes in render order, and ``lagged_edges`` is the set of ``(producer, consumer)`` edges that were broken to resolve a cycle (the consumer reads last frame's texture across that edge). Empty on the acyclic common case. Never raises on a cyclic graph: cycles degrade to a 1-frame-lagged edge. """ n = len(live) if n <= 1: return list(live), set() index_of = {id(node): i for i, node in enumerate(live)} # (a) slot -> producer node, for live nodes with a valid (>= 0) slot. producer_by_slot: dict[int, Any] = {} for node in live: slot = slot_of(node) if isinstance(slot, int) and slot >= 0: # If two nodes somehow share a slot, first in discovery order wins; # bindless slots are unique so this is defensive only. producer_by_slot.setdefault(slot, node) # (b) edges producer -> consumer (consumer samples producer's slot). edges: set[tuple[int, int]] = set() # (producer_index, consumer_index) def add_edge(producer: Any, consumer: Any) -> None: pi = index_of.get(id(producer)) ci = index_of.get(id(consumer)) if pi is None or ci is None or pi == ci: return edges.add((pi, ci)) for consumer in live: # Implicit: any slot consumed in this SubViewport's subtree that maps to # *another* live SubViewport's slot is a producer edge. for slot in scan_consumed_slots(consumer): producer = producer_by_slot.get(slot) if producer is not None and producer is not consumer: add_edge(producer, consumer) # Explicit: feeds_from hints, unioned with implicit edges. for producer in _normalise_feeds_from(consumer): add_edge(producer, consumer) # (c) + (d) Kahn topo-sort; break minimal back-edges on cycles. ordered_idx, lagged_idx = _topo_sort_break_cycles(n, edges) ordered = [live[i] for i in ordered_idx] lagged_edges = {(live[p], live[c]) for (p, c) in lagged_idx} # depth_cap is honoured by construction: each node is scheduled exactly once # (cap >= 1). A cap of 0 would mean "do not render"; we clamp to 1 so the # slot stays valid, matching the once-per-frame default. if depth_cap <= 0: log.debug("subviewport_feedback_depth <= 0 clamped to 1 (render once)") return ordered, lagged_edges
def _topo_sort_break_cycles( n: int, edges: set[tuple[int, int]] ) -> tuple[list[int], set[tuple[int, int]]]: """Kahn topo-sort over node indices ``0..n-1`` with deterministic tiebreak. On a cycle, finds SCCs (Tarjan), breaks the minimal deterministic back-edge in each non-trivial SCC, and retries. Returns ``(ordered_indices, broken_edges)``. """ broken: set[tuple[int, int]] = set() work = set(edges) while True: adj: dict[int, list[int]] = {i: [] for i in range(n)} indeg = [0] * n for p, c in work: adj[p].append(c) indeg[c] += 1 # Kahn with a sorted frontier for a deterministic order (tiebreak by # discovery index, which is the node index here). queue = sorted(i for i in range(n) if indeg[i] == 0) ordered: list[int] = [] while queue: node = queue.pop(0) ordered.append(node) grew = False for nxt in sorted(adj[node]): indeg[nxt] -= 1 if indeg[nxt] == 0: queue.append(nxt) grew = True if grew: queue.sort() if len(ordered) == n: return ordered, broken # Cycle remains: break one minimal back-edge in a non-trivial SCC. back_edge = _pick_back_edge(n, work) if back_edge is None: # Should not happen (len < n implies a cycle), but never loop. remaining = sorted(set(range(n)) - set(ordered)) return ordered + remaining, broken work.discard(back_edge) broken.add(back_edge) def _pick_back_edge(n: int, edges: set[tuple[int, int]]) -> tuple[int, int] | None: """Return one deterministic back-edge inside a non-trivial SCC, or None. Uses Tarjan to find SCCs, picks the lowest-index non-trivial SCC, and within it the lexicographically smallest edge that points "backwards" (toward an equal-or-lower index member), which is guaranteed to exist in a cycle. Removing it strictly reduces the cycle without orphaning producers outside the SCC. """ sccs = _tarjan_scc(n, edges) # Deterministic: consider SCCs ordered by their smallest member. for scc in sorted((sorted(s) for s in sccs if len(s) > 1), key=lambda s: s[0]): members = set(scc) in_scc = sorted((p, c) for (p, c) in edges if p in members and c in members) # Prefer an edge that goes to an equal-or-lower index (a true back-edge # in the deterministic index order); fall back to the smallest in-SCC # edge so we always make progress. for p, c in in_scc: if c <= p: return (p, c) if in_scc: return in_scc[0] return None def _tarjan_scc(n: int, edges: set[tuple[int, int]]) -> list[set[int]]: """Tarjan's strongly-connected-components, iterative (no recursion limit).""" adj: dict[int, list[int]] = {i: [] for i in range(n)} for p, c in edges: adj[p].append(c) for i in range(n): adj[i].sort() index_counter = 0 indices: dict[int, int] = {} lowlink: dict[int, int] = {} on_stack: set[int] = set() stack: list[int] = [] result: list[set[int]] = [] for start in range(n): if start in indices: continue # Iterative DFS with an explicit work stack of (node, child_iterator). work: list[tuple[int, int]] = [(start, 0)] while work: node, child_i = work[-1] if child_i == 0: indices[node] = lowlink[node] = index_counter index_counter += 1 stack.append(node) on_stack.add(node) if child_i < len(adj[node]): work[-1] = (node, child_i + 1) nxt = adj[node][child_i] if nxt not in indices: work.append((nxt, 0)) elif nxt in on_stack: lowlink[node] = min(lowlink[node], indices[nxt]) else: if lowlink[node] == indices[node]: comp: set[int] = set() while True: w = stack.pop() on_stack.discard(w) comp.add(w) if w == node: break result.append(comp) work.pop() if work: parent = work[-1][0] lowlink[parent] = min(lowlink[parent], lowlink[node]) return result