Source code for simvx.graphics.renderer.async_compute

"""Async-compute scheduler: route compute passes to a dedicated queue when one exists.

The renderer records several compute passes per frame (occlusion cull, Hi-Z
build, particle simulation, light cull). On a GPU with a *dedicated* compute
queue family (COMPUTE without GRAPHICS), those passes can run on a separate
queue and overlap graphics work; the graphics submit then waits on a
compute-done semaphore before its consumers (draw-indirect / vertex / depth
sample) read the compute results.

``AsyncComputeScheduler`` is the single seam that hides this. Two modes,
selected once at construction from :pyattr:`GPUContext.async_compute`:

- **passthrough** (no dedicated compute queue, e.g. integrated GPUs / this dev
  box): :meth:`record_compute` records straight into the primary graphics
  command buffer the caller passes in, :meth:`submit` is a no-op, and
  :meth:`graphics_wait_semaphores` is empty. The frame is byte-identical to the
  pre-async path: the scheduler adds nothing to the command stream.
- **async** (dedicated compute queue): :meth:`begin_frame` opens a per-frame
  compute command buffer on the compute queue's pool; :meth:`record_compute`
  records into *that* buffer (ignoring the graphics cmd argument); :meth:`submit`
  ends and submits it to the compute queue signalling the frame's compute-done
  semaphore; :meth:`graphics_wait_semaphores` returns that semaphore (with its
  wait stage) so the graphics submit blocks until compute results are visible.

Cross-queue *data* visibility for the shared control buffers is handled at
buffer-creation time via ``VK_SHARING_MODE_CONCURRENT`` (see
``GPUContext.concurrent_compute_families`` and ``memory.create_buffer``); this
scheduler owns only the *execution* ordering (command buffer + semaphore).

Routing the actual passes through this scheduler is a separate wave; this module
provides the gated abstraction and is a no-op until a pass calls
:meth:`record_compute`.
"""

from __future__ import annotations

import logging
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

import vulkan as vk

from ..gpu.commands import CommandContext

if TYPE_CHECKING:
    from ..gpu.context import GPUContext

__all__ = ["AsyncComputeScheduler"]

log = logging.getLogger(__name__)


[docs] class AsyncComputeScheduler: """Records + submits compute passes on a dedicated queue when available. Construct once per Engine. ``async_enabled`` reflects the GPU: when ``False`` every method degrades to the single-queue passthrough. """ def __init__(self, ctx: GPUContext, frames_in_flight: int) -> None: self._ctx = ctx self._frames = frames_in_flight self.async_enabled: bool = ctx.async_compute # Per-frame compute command buffers + compute-done semaphores, allocated # only on the async path. The primary (passthrough) path uses the # graphics command buffer the caller already owns and never touches # these, so they stay empty/None and cost nothing on this box. self._cmd_ctx: CommandContext | None = None self._cmd_buffers: list[Any] = [] self._semaphores: list[Any] = [] self._fences: list[Any] = [] self._frame: int = 0 # The graphics-cmd handle for the current frame in passthrough mode; the # active compute cmd in async mode. record_compute records into this. self._active_cmd: Any = None self._recorded: bool = False if self.async_enabled: self._create_resources() log.debug("AsyncComputeScheduler: dedicated compute queue (qf=%s)", ctx.compute_qf) else: log.debug("AsyncComputeScheduler: passthrough (no dedicated compute queue)") # ------------------------------------------------------------------ setup def _create_resources(self) -> None: assert self._ctx.compute_qf is not None self._cmd_ctx = CommandContext(self._ctx.device, self._ctx.compute_qf) self._cmd_ctx.create_pool() self._cmd_buffers = self._cmd_ctx.allocate(self._frames) sem_info = vk.VkSemaphoreCreateInfo() self._semaphores = [vk.vkCreateSemaphore(self._ctx.device, sem_info, None) for _ in range(self._frames)] # Per-frame fence on the compute submit so a frame's compute command # buffer is proven retired before it is reset+re-recorded _frames later. # The graphics fence only gates the graphics queue; the compute queue # runs on its own timeline, so without this fence vkResetCommandBuffer # could hit an in-flight compute cmd (undefined behaviour, a rig-only # hazard on drivers that run the compute queue ahead). Created SIGNALED # so the first begin_frame for each slot does not block. fence_info = vk.VkFenceCreateInfo(flags=vk.VK_FENCE_CREATE_SIGNALED_BIT) self._fences = [vk.vkCreateFence(self._ctx.device, fence_info, None) for _ in range(self._frames)] # ------------------------------------------------------------------ frame
[docs] def begin_frame(self, frame_index: int, graphics_cmd: Any) -> None: """Open recording for ``frame_index``. ``graphics_cmd`` is the primary command buffer for this frame; in passthrough mode compute passes are recorded into it directly. In async mode it is unused (compute records into the dedicated compute cmd) but is still passed for a uniform call site. """ self._frame = frame_index % self._frames self._recorded = False if not self.async_enabled: self._active_cmd = graphics_cmd return # Ensure this slot's previous compute submit has retired before reusing # its command buffer (the fence was signalled by that submit). fence = self._fences[self._frame] vk.vkWaitForFences(self._ctx.device, 1, [fence], vk.VK_TRUE, 0xFFFFFFFFFFFFFFFF) vk.vkResetFences(self._ctx.device, 1, [fence]) cmd = self._cmd_buffers[self._frame] vk.vkResetCommandBuffer(cmd, 0) vk.vkBeginCommandBuffer(cmd, vk.VkCommandBufferBeginInfo()) self._active_cmd = cmd
[docs] def record_compute(self, record_fn: Callable[[Any], None]) -> None: """Record a compute pass. ``record_fn(cmd)`` issues the dispatch(es) and any *intra*-compute barriers. In async mode ``cmd`` is the dedicated compute command buffer; in passthrough mode it is the primary graphics command buffer (so the pass records inline exactly as it does today). Producer->consumer cross-queue ordering is provided by the compute-done semaphore on the async path; the pass's own intra-queue barriers remain valid in both. """ if self._active_cmd is None: raise RuntimeError("AsyncComputeScheduler.record_compute called before begin_frame") record_fn(self._active_cmd) self._recorded = True
[docs] def submit(self) -> None: """Submit the compute command buffer (async) or no-op (passthrough). In async mode the compute cmd is ended and submitted to the compute queue, signalling this frame's compute-done semaphore that the graphics submit waits on. With no recorded work the semaphore is still signalled (empty submit) so the graphics wait never deadlocks. """ if not self.async_enabled: return cmd = self._cmd_buffers[self._frame] vk.vkEndCommandBuffer(cmd) submit = vk.VkSubmitInfo( commandBufferCount=1, pCommandBuffers=[cmd], signalSemaphoreCount=1, pSignalSemaphores=[self._semaphores[self._frame]], ) vk.vkQueueSubmit(self._ctx.compute_queue, 1, [submit], self._fences[self._frame])
[docs] def graphics_wait_semaphores(self) -> tuple[list[Any], list[int]]: """Semaphores + per-semaphore wait stages for the graphics submit. Returns ``(semaphores, wait_stages)`` to merge into the frame's ``VkSubmitInfo`` so the graphics queue blocks on compute results before the indirect-draw / vertex / depth-sample stages consume them. Empty in passthrough mode (no extra wait: identical submit to today). """ if not self.async_enabled: return [], [] # DRAW_INDIRECT covers vkCmdDrawIndexedIndirect reading instance_count; # VERTEX_SHADER covers particle buffers read in the vertex stage; # FRAGMENT_SHADER covers light-grid / depth sampled in fragment. stage = ( vk.VK_PIPELINE_STAGE_DRAW_INDIRECT_BIT | vk.VK_PIPELINE_STAGE_VERTEX_SHADER_BIT | vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT ) return [self._semaphores[self._frame]], [stage]
[docs] def destroy(self) -> None: for sem in self._semaphores: vk.vkDestroySemaphore(self._ctx.device, sem, None) self._semaphores.clear() for fence in self._fences: vk.vkDestroyFence(self._ctx.device, fence, None) self._fences.clear() self._cmd_buffers.clear() if self._cmd_ctx is not None: self._cmd_ctx.destroy() self._cmd_ctx = None