"""Async-compute scheduler: route compute passes to a dedicated queue when one exists.
The renderer records several compute passes per frame (occlusion cull, Hi-Z
build, particle simulation, light cull). On a GPU with a *dedicated* compute
queue family (COMPUTE without GRAPHICS), those passes can run on a separate
queue and overlap graphics work; the graphics submit then waits on a
compute-done semaphore before its consumers (draw-indirect / vertex / depth
sample) read the compute results.
``AsyncComputeScheduler`` is the single seam that hides this. Two modes,
selected once at construction from :pyattr:`GPUContext.async_compute`:
- **passthrough** (no dedicated compute queue, e.g. integrated GPUs / this dev
box): :meth:`record_compute` records straight into the primary graphics
command buffer the caller passes in, :meth:`submit` is a no-op, and
:meth:`graphics_wait_semaphores` is empty. The frame is byte-identical to the
pre-async path: the scheduler adds nothing to the command stream.
- **async** (dedicated compute queue): :meth:`begin_frame` opens a per-frame
compute command buffer on the compute queue's pool; :meth:`record_compute`
records into *that* buffer (ignoring the graphics cmd argument); :meth:`submit`
ends and submits it to the compute queue signalling the frame's compute-done
semaphore; :meth:`graphics_wait_semaphores` returns that semaphore (with its
wait stage) so the graphics submit blocks until compute results are visible.
Cross-queue *data* visibility for the shared control buffers is handled at
buffer-creation time via ``VK_SHARING_MODE_CONCURRENT`` (see
``GPUContext.concurrent_compute_families`` and ``memory.create_buffer``); this
scheduler owns only the *execution* ordering (command buffer + semaphore).
Routing the actual passes through this scheduler is a separate wave; this module
provides the gated abstraction and is a no-op until a pass calls
:meth:`record_compute`.
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
import vulkan as vk
from ..gpu.commands import CommandContext
if TYPE_CHECKING:
from ..gpu.context import GPUContext
__all__ = ["AsyncComputeScheduler"]
log = logging.getLogger(__name__)
[docs]
class AsyncComputeScheduler:
"""Records + submits compute passes on a dedicated queue when available.
Construct once per Engine. ``async_enabled`` reflects the GPU: when
``False`` every method degrades to the single-queue passthrough.
"""
def __init__(self, ctx: GPUContext, frames_in_flight: int) -> None:
self._ctx = ctx
self._frames = frames_in_flight
self.async_enabled: bool = ctx.async_compute
# Per-frame compute command buffers + compute-done semaphores, allocated
# only on the async path. The primary (passthrough) path uses the
# graphics command buffer the caller already owns and never touches
# these, so they stay empty/None and cost nothing on this box.
self._cmd_ctx: CommandContext | None = None
self._cmd_buffers: list[Any] = []
self._semaphores: list[Any] = []
self._fences: list[Any] = []
self._frame: int = 0
# The graphics-cmd handle for the current frame in passthrough mode; the
# active compute cmd in async mode. record_compute records into this.
self._active_cmd: Any = None
self._recorded: bool = False
if self.async_enabled:
self._create_resources()
log.debug("AsyncComputeScheduler: dedicated compute queue (qf=%s)", ctx.compute_qf)
else:
log.debug("AsyncComputeScheduler: passthrough (no dedicated compute queue)")
# ------------------------------------------------------------------ setup
def _create_resources(self) -> None:
assert self._ctx.compute_qf is not None
self._cmd_ctx = CommandContext(self._ctx.device, self._ctx.compute_qf)
self._cmd_ctx.create_pool()
self._cmd_buffers = self._cmd_ctx.allocate(self._frames)
sem_info = vk.VkSemaphoreCreateInfo()
self._semaphores = [vk.vkCreateSemaphore(self._ctx.device, sem_info, None) for _ in range(self._frames)]
# Per-frame fence on the compute submit so a frame's compute command
# buffer is proven retired before it is reset+re-recorded _frames later.
# The graphics fence only gates the graphics queue; the compute queue
# runs on its own timeline, so without this fence vkResetCommandBuffer
# could hit an in-flight compute cmd (undefined behaviour, a rig-only
# hazard on drivers that run the compute queue ahead). Created SIGNALED
# so the first begin_frame for each slot does not block.
fence_info = vk.VkFenceCreateInfo(flags=vk.VK_FENCE_CREATE_SIGNALED_BIT)
self._fences = [vk.vkCreateFence(self._ctx.device, fence_info, None) for _ in range(self._frames)]
# ------------------------------------------------------------------ frame
[docs]
def begin_frame(self, frame_index: int, graphics_cmd: Any) -> None:
"""Open recording for ``frame_index``.
``graphics_cmd`` is the primary command buffer for this frame; in
passthrough mode compute passes are recorded into it directly. In async
mode it is unused (compute records into the dedicated compute cmd) but is
still passed for a uniform call site.
"""
self._frame = frame_index % self._frames
self._recorded = False
if not self.async_enabled:
self._active_cmd = graphics_cmd
return
# Ensure this slot's previous compute submit has retired before reusing
# its command buffer (the fence was signalled by that submit).
fence = self._fences[self._frame]
vk.vkWaitForFences(self._ctx.device, 1, [fence], vk.VK_TRUE, 0xFFFFFFFFFFFFFFFF)
vk.vkResetFences(self._ctx.device, 1, [fence])
cmd = self._cmd_buffers[self._frame]
vk.vkResetCommandBuffer(cmd, 0)
vk.vkBeginCommandBuffer(cmd, vk.VkCommandBufferBeginInfo())
self._active_cmd = cmd
[docs]
def record_compute(self, record_fn: Callable[[Any], None]) -> None:
"""Record a compute pass.
``record_fn(cmd)`` issues the dispatch(es) and any *intra*-compute
barriers. In async mode ``cmd`` is the dedicated compute command buffer;
in passthrough mode it is the primary graphics command buffer (so the
pass records inline exactly as it does today). Producer->consumer
cross-queue ordering is provided by the compute-done semaphore on the
async path; the pass's own intra-queue barriers remain valid in both.
"""
if self._active_cmd is None:
raise RuntimeError("AsyncComputeScheduler.record_compute called before begin_frame")
record_fn(self._active_cmd)
self._recorded = True
[docs]
def submit(self) -> None:
"""Submit the compute command buffer (async) or no-op (passthrough).
In async mode the compute cmd is ended and submitted to the compute
queue, signalling this frame's compute-done semaphore that the graphics
submit waits on. With no recorded work the semaphore is still signalled
(empty submit) so the graphics wait never deadlocks.
"""
if not self.async_enabled:
return
cmd = self._cmd_buffers[self._frame]
vk.vkEndCommandBuffer(cmd)
submit = vk.VkSubmitInfo(
commandBufferCount=1,
pCommandBuffers=[cmd],
signalSemaphoreCount=1,
pSignalSemaphores=[self._semaphores[self._frame]],
)
vk.vkQueueSubmit(self._ctx.compute_queue, 1, [submit], self._fences[self._frame])
[docs]
def graphics_wait_semaphores(self) -> tuple[list[Any], list[int]]:
"""Semaphores + per-semaphore wait stages for the graphics submit.
Returns ``(semaphores, wait_stages)`` to merge into the frame's
``VkSubmitInfo`` so the graphics queue blocks on compute results before
the indirect-draw / vertex / depth-sample stages consume them. Empty in
passthrough mode (no extra wait: identical submit to today).
"""
if not self.async_enabled:
return [], []
# DRAW_INDIRECT covers vkCmdDrawIndexedIndirect reading instance_count;
# VERTEX_SHADER covers particle buffers read in the vertex stage;
# FRAGMENT_SHADER covers light-grid / depth sampled in fragment.
stage = (
vk.VK_PIPELINE_STAGE_DRAW_INDIRECT_BIT
| vk.VK_PIPELINE_STAGE_VERTEX_SHADER_BIT
| vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT
)
return [self._semaphores[self._frame]], [stage]
[docs]
def destroy(self) -> None:
for sem in self._semaphores:
vk.vkDestroySemaphore(self._ctx.device, sem, None)
self._semaphores.clear()
for fence in self._fences:
vk.vkDestroyFence(self._ctx.device, fence, None)
self._fences.clear()
self._cmd_buffers.clear()
if self._cmd_ctx is not None:
self._cmd_ctx.destroy()
self._cmd_ctx = None