"""Backend-agnostic per-frame ordering of SubViewport render targets.
A :class:`~simvx.core.SubViewport` renders its subtree into an offscreen
bindless texture each frame. When SubViewport *A* samples SubViewport *B*'s
texture (``material.albedo_tex_index = B.texture`` or ``sprite._texture_id =
B.texture``), *B* is a **producer** and *A* a **consumer**: *B* must render
first so *A* sees fresh content the same frame instead of last frame's (the
1-frame lag that flat discovery order causes).
:func:`order_subviewports` is the single canonical implementation both the
desktop (Vulkan) and web (WebGPU) backends call. It:
1. Builds ``{slot -> producer}`` from each live SubViewport's current bindless
slot (``node._texture_id``).
2. Scans each SubViewport's own subtree (stopping at nested SubViewport
boundaries) for any material / Sprite-like sampler slot equal to *another*
live SubViewport's slot, and unions the explicit ``node.feeds_from`` hints,
to build producer -> consumer edges.
3. Kahn topo-sorts so producers precede consumers (deterministic tiebreak by
discovery index).
4. On a genuine cycle (e.g. two mirrors facing each other) runs Tarjan SCC and
breaks the minimal back-edge(s) deterministically, so only the cyclic
boundary lags (the broken consumer reads last frame's texture, the natural
result of rendering it before its producer). It never raises.
**First-frame warmup.** Before the backend has assigned slots, every
``slot_of(node)`` returns ``-1``; no edges form, so the order is the flat
discovery order. That is correct: there is nothing to sample yet. From the
second frame on (slots assigned) the topological order takes effect, so a
producer -> consumer chain is lag-free from frame 1 of actually sampling.
The function is dependency-light: it imports nothing from ``simvx.graphics``
and inspects only core data (``Material``, Sprite-like ``_texture_id``).
"""
from __future__ import annotations
import logging
from collections.abc import Callable, Iterable
from typing import Any
log = logging.getLogger(__name__)
__all__ = ["order_subviewports", "scan_consumed_slots"]
def _material_slots(material: Any) -> Iterable[int]:
"""Yield every bindless sampler slot a single material references.
``Material`` keeps texture *sources* (paths / bytes / ndarrays) in its
``*_uri`` fields, which never hold a bindless slot. The one field that
carries a live bindless index is ``albedo_tex_index`` (set directly by a
backend or by a consumer binding ``svp.texture``). Scanning only it is
correct and unambiguous.
"""
slot = getattr(material, "albedo_tex_index", None)
if isinstance(slot, int):
yield slot
[docs]
def scan_consumed_slots(viewport: Any) -> set[int]:
"""Collect every bindless slot consumed inside *viewport*'s subtree.
Walks the SubViewport's children in DFS, **stopping at nested SubViewport
boundaries** (a nested SubViewport's own consumption is its own edge,
scanned when that nested node is processed from the live list, so each edge
is counted once). For every node it reads:
* ``node.material.albedo_tex_index`` (``MeshInstance3D`` and friends);
* ``node._texture_id`` (Sprite2D / Sprite3D / MeshInstance2D / NinePatch /
AnimatedSprite2D, whose drawn texture slot lives here).
Returns the set of slot integers found (negatives included; the caller
filters them against the live-producer map). Excludes *viewport*'s own
published slot, which it does not consume.
"""
slots: set[int] = set()
stack = list(getattr(viewport, "children", ()) or ())
while stack:
cur = stack.pop()
if _is_subviewport(cur):
# Do not descend into a nested SubViewport.
continue
mat = getattr(cur, "material", None)
if mat is not None:
slots.update(_material_slots(mat))
tid = getattr(cur, "_texture_id", None)
if isinstance(tid, int):
slots.add(tid)
stack.extend(getattr(cur, "children", ()) or ())
return slots
def _is_subviewport(node: Any) -> bool:
"""True if *node* is a SubViewport, by MRO name (no import needed)."""
return any(c.__name__ == "SubViewport" for c in type(node).__mro__)
def _normalise_feeds_from(node: Any) -> list[Any]:
"""Return the explicit producer list from ``node.feeds_from``.
Accepts a single SubViewport, an iterable of them, or ``None``/empty.
"""
feeds = getattr(node, "feeds_from", None)
if feeds is None:
return []
if _is_subviewport(feeds):
return [feeds]
try:
return [f for f in feeds if f is not None]
except TypeError:
return []
[docs]
def order_subviewports(
live: list[Any],
slot_of: Callable[[Any], int],
*,
depth_cap: int = 1,
) -> tuple[list[Any], set[tuple[Any, Any]]]:
"""Order *live* SubViewports so producers render before consumers.
Args:
live: SubViewport nodes in discovery order (the flat DFS order the
backend already collects).
slot_of: ``node -> int`` returning the node's current bindless slot
(``node._texture_id``); ``< 0`` means "no slot yet" (first frame).
depth_cap: Bound on recursive scene-feedback depth (the
``WorldEnvironment.subviewport_feedback_depth`` property). The
default ``1`` renders each SubViewport once per frame; cyclic
back-edges then lag by one frame. ``depth_cap <= 0`` is treated as
``1`` (each node still renders once). Higher caps are reserved for a
future multi-pass feedback expansion and currently behave like 1
for the ordering itself (each node appears once); the cap is honoured
in that no node is scheduled more times than the cap allows.
Returns:
``(ordered, lagged_edges)`` where ``ordered`` is the list of the same
SubViewport nodes in render order, and ``lagged_edges`` is the set of
``(producer, consumer)`` edges that were broken to resolve a cycle (the
consumer reads last frame's texture across that edge). Empty on the
acyclic common case.
Never raises on a cyclic graph: cycles degrade to a 1-frame-lagged edge.
"""
n = len(live)
if n <= 1:
return list(live), set()
index_of = {id(node): i for i, node in enumerate(live)}
# (a) slot -> producer node, for live nodes with a valid (>= 0) slot.
producer_by_slot: dict[int, Any] = {}
for node in live:
slot = slot_of(node)
if isinstance(slot, int) and slot >= 0:
# If two nodes somehow share a slot, first in discovery order wins;
# bindless slots are unique so this is defensive only.
producer_by_slot.setdefault(slot, node)
# (b) edges producer -> consumer (consumer samples producer's slot).
edges: set[tuple[int, int]] = set() # (producer_index, consumer_index)
def add_edge(producer: Any, consumer: Any) -> None:
pi = index_of.get(id(producer))
ci = index_of.get(id(consumer))
if pi is None or ci is None or pi == ci:
return
edges.add((pi, ci))
for consumer in live:
# Implicit: any slot consumed in this SubViewport's subtree that maps to
# *another* live SubViewport's slot is a producer edge.
for slot in scan_consumed_slots(consumer):
producer = producer_by_slot.get(slot)
if producer is not None and producer is not consumer:
add_edge(producer, consumer)
# Explicit: feeds_from hints, unioned with implicit edges.
for producer in _normalise_feeds_from(consumer):
add_edge(producer, consumer)
# (c) + (d) Kahn topo-sort; break minimal back-edges on cycles.
ordered_idx, lagged_idx = _topo_sort_break_cycles(n, edges)
ordered = [live[i] for i in ordered_idx]
lagged_edges = {(live[p], live[c]) for (p, c) in lagged_idx}
# depth_cap is honoured by construction: each node is scheduled exactly once
# (cap >= 1). A cap of 0 would mean "do not render"; we clamp to 1 so the
# slot stays valid, matching the once-per-frame default.
if depth_cap <= 0:
log.debug("subviewport_feedback_depth <= 0 clamped to 1 (render once)")
return ordered, lagged_edges
def _topo_sort_break_cycles(
n: int, edges: set[tuple[int, int]]
) -> tuple[list[int], set[tuple[int, int]]]:
"""Kahn topo-sort over node indices ``0..n-1`` with deterministic tiebreak.
On a cycle, finds SCCs (Tarjan), breaks the minimal deterministic back-edge
in each non-trivial SCC, and retries. Returns ``(ordered_indices,
broken_edges)``.
"""
broken: set[tuple[int, int]] = set()
work = set(edges)
while True:
adj: dict[int, list[int]] = {i: [] for i in range(n)}
indeg = [0] * n
for p, c in work:
adj[p].append(c)
indeg[c] += 1
# Kahn with a sorted frontier for a deterministic order (tiebreak by
# discovery index, which is the node index here).
queue = sorted(i for i in range(n) if indeg[i] == 0)
ordered: list[int] = []
while queue:
node = queue.pop(0)
ordered.append(node)
grew = False
for nxt in sorted(adj[node]):
indeg[nxt] -= 1
if indeg[nxt] == 0:
queue.append(nxt)
grew = True
if grew:
queue.sort()
if len(ordered) == n:
return ordered, broken
# Cycle remains: break one minimal back-edge in a non-trivial SCC.
back_edge = _pick_back_edge(n, work)
if back_edge is None:
# Should not happen (len < n implies a cycle), but never loop.
remaining = sorted(set(range(n)) - set(ordered))
return ordered + remaining, broken
work.discard(back_edge)
broken.add(back_edge)
def _pick_back_edge(n: int, edges: set[tuple[int, int]]) -> tuple[int, int] | None:
"""Return one deterministic back-edge inside a non-trivial SCC, or None.
Uses Tarjan to find SCCs, picks the lowest-index non-trivial SCC, and
within it the lexicographically smallest edge that points "backwards"
(toward an equal-or-lower index member), which is guaranteed to exist in a
cycle. Removing it strictly reduces the cycle without orphaning producers
outside the SCC.
"""
sccs = _tarjan_scc(n, edges)
# Deterministic: consider SCCs ordered by their smallest member.
for scc in sorted((sorted(s) for s in sccs if len(s) > 1), key=lambda s: s[0]):
members = set(scc)
in_scc = sorted((p, c) for (p, c) in edges if p in members and c in members)
# Prefer an edge that goes to an equal-or-lower index (a true back-edge
# in the deterministic index order); fall back to the smallest in-SCC
# edge so we always make progress.
for p, c in in_scc:
if c <= p:
return (p, c)
if in_scc:
return in_scc[0]
return None
def _tarjan_scc(n: int, edges: set[tuple[int, int]]) -> list[set[int]]:
"""Tarjan's strongly-connected-components, iterative (no recursion limit)."""
adj: dict[int, list[int]] = {i: [] for i in range(n)}
for p, c in edges:
adj[p].append(c)
for i in range(n):
adj[i].sort()
index_counter = 0
indices: dict[int, int] = {}
lowlink: dict[int, int] = {}
on_stack: set[int] = set()
stack: list[int] = []
result: list[set[int]] = []
for start in range(n):
if start in indices:
continue
# Iterative DFS with an explicit work stack of (node, child_iterator).
work: list[tuple[int, int]] = [(start, 0)]
while work:
node, child_i = work[-1]
if child_i == 0:
indices[node] = lowlink[node] = index_counter
index_counter += 1
stack.append(node)
on_stack.add(node)
if child_i < len(adj[node]):
work[-1] = (node, child_i + 1)
nxt = adj[node][child_i]
if nxt not in indices:
work.append((nxt, 0))
elif nxt in on_stack:
lowlink[node] = min(lowlink[node], indices[nxt])
else:
if lowlink[node] == indices[node]:
comp: set[int] = set()
while True:
w = stack.pop()
on_stack.discard(w)
comp.add(w)
if w == node:
break
result.append(comp)
work.pop()
if work:
parent = work[-1][0]
lowlink[parent] = min(lowlink[parent], lowlink[node])
return result