Source code for simvx.graphics.assets.cubemap_loader

"""Load cubemap textures from 6 face images or equirectangular HDR."""

import logging
from pathlib import Path
from typing import Any

import numpy as np
import vulkan as vk

from ..gpu.memory import create_sampler

__all__ = ["load_cubemap", "decode_rgbe", "equirect_to_cubemap", "gradient_cubemap_faces"]

log = logging.getLogger(__name__)


[docs] def decode_rgbe(data: bytes) -> np.ndarray: """Decode a Radiance RGBE / .hdr image to a float32 ``(H, W, 3)`` array. Supports both uncompressed (FORMAT=32-bit_rle_rgbe) headers and the standard run-length-encoded scanline form (the only forms produced by every common HDR exporter: Lightroom, Substance, sIBL Archive). No Pillow / imageio / OpenEXR dependency. Raises ``ValueError`` if ``data`` is not a recognisable RGBE stream. """ pos = 0 n = len(data) if not data.startswith(b"#?"): raise ValueError("Not an RGBE/Radiance HDR file (missing #? magic)") # Parse ASCII header: terminated by a blank line. header_end = data.find(b"\n\n", pos) if header_end < 0: raise ValueError("RGBE header is missing the blank-line terminator") header = data[:header_end].decode("ascii", errors="replace") fmt_ok = "FORMAT=32-bit_rle_rgbe" in header or "FORMAT=32-bit_rle_xyze" in header if not fmt_ok: # Some files don't declare FORMAT; tolerate that: only XYZE truly # needs different handling and is exotic enough to flag. if "FORMAT=" in header and "32-bit_rle_rgbe" not in header: raise ValueError("Unsupported RGBE FORMAT (only 32-bit_rle_rgbe is supported)") pos = header_end + 2 # skip the blank line # Resolution string: e.g. ``-Y 512 +X 1024\n`` (Y first = top-down). res_end = data.find(b"\n", pos) if res_end < 0: raise ValueError("RGBE resolution string missing newline") res_line = data[pos:res_end].decode("ascii").strip().split() pos = res_end + 1 if len(res_line) != 4: raise ValueError(f"Unexpected RGBE resolution line: {res_line!r}") # Standard form: ``-Y H +X W``. Other orientations exist but are rare. flip_y = res_line[0] == "+Y" # default -Y means top-down (no flip) flip_x = res_line[2] == "-X" height = int(res_line[1]) width = int(res_line[3]) out = np.zeros((height, width, 4), dtype=np.uint8) for y in range(height): if pos + 4 > n: raise ValueError(f"RGBE truncated at scanline {y}") b0, b1, b2, b3 = data[pos], data[pos + 1], data[pos + 2], data[pos + 3] # New RLE marker: 0x02 0x02 followed by hi/lo of width (must match). if b0 == 2 and b1 == 2 and ((b2 << 8) | b3) == width and width >= 8 and width <= 0x7fff: pos += 4 scanline = np.empty((4, width), dtype=np.uint8) for ch in range(4): x = 0 while x < width: if pos >= n: raise ValueError("RGBE truncated mid-scanline") count = data[pos] pos += 1 if count > 128: # Run of identical bytes run_len = count - 128 if pos >= n or x + run_len > width: raise ValueError("RGBE run-length exceeds scanline width") scanline[ch, x:x + run_len] = data[pos] pos += 1 x += run_len else: # Literal block if pos + count > n or x + count > width: raise ValueError("RGBE literal block exceeds scanline width") scanline[ch, x:x + count] = np.frombuffer( data, dtype=np.uint8, count=count, offset=pos, ) pos += count x += count out[y, :, 0] = scanline[0] out[y, :, 1] = scanline[1] out[y, :, 2] = scanline[2] out[y, :, 3] = scanline[3] else: # Old-style uncompressed scanline: width * 4 bytes back-to-back. if pos + width * 4 > n: raise ValueError(f"RGBE truncated at uncompressed scanline {y}") row = np.frombuffer(data, dtype=np.uint8, count=width * 4, offset=pos).reshape(width, 4) out[y] = row pos += width * 4 if flip_y: out = out[::-1] if flip_x: out = out[:, ::-1] # Decode RGBE → linear float32 RGB. rgbe = out.astype(np.float32) e = rgbe[..., 3] # exponent 0 → black (avoid division/exp overflow paths) mantissa_scale = np.where(e > 0, np.ldexp(1.0 / 256.0, (e - 128).astype(np.int32)), 0.0) rgb = rgbe[..., :3] * mantissa_scale[..., None] return rgb.astype(np.float32)
[docs] def gradient_cubemap_faces( top_rgb: tuple[float, float, float], bottom_rgb: tuple[float, float, float], size: int = 64, ) -> list[np.ndarray]: """Synthesize 6 cube faces with a vertical gradient from ``top`` to ``bottom``. Returns 6 ``(size, size, 4)`` float32 RGBA arrays in Vulkan order ``[+X, -X, +Y, -Y, +Z, -Z]``, values in ``[0, 1]``. The gradient is sampled by each texel's world-space direction (its ``y`` component) so the horizon ring is continuous across all faces: texel-row gradients kink at the seams. Math mirrors the web ``_gradient_cubemap_faces`` (``web/renderer/web.py``) exactly so both backends produce an identical gradient sky / IBL ambient. The two are kept as parallel impls because the web renderer runs in Pyodide and cannot import this module (``cubemap_loader`` imports ``vulkan``); a future single-source would move the pure-NumPy core into ``simvx.core``. """ t = np.asarray(top_rgb, dtype=np.float32)[:3] b = np.asarray(bottom_rgb, dtype=np.float32)[:3] grid = (np.arange(size, dtype=np.float32) + 0.5) / size * 2.0 - 1.0 sx, sy = np.meshgrid(grid, grid) one = np.ones_like(sx) face_dirs = [ np.stack([one, -sy, -sx], axis=-1), # +X np.stack([-one, -sy, sx], axis=-1), # -X np.stack([sx, one, sy], axis=-1), # +Y np.stack([sx, -one, -sy], axis=-1), # -Y np.stack([sx, -sy, one], axis=-1), # +Z np.stack([-sx, -sy, -one], axis=-1), # -Z ] faces: list[np.ndarray] = [] for d in face_dirs: d = d / np.linalg.norm(d, axis=-1, keepdims=True) f = ((1.0 - d[..., 1]) * 0.5)[..., None] # 0 at +Y pole, 1 at -Y pole c = t * (1.0 - f) + b * f rgba = np.empty((size, size, 4), dtype=np.float32) rgba[..., :3] = c rgba[..., 3] = 1.0 faces.append(np.ascontiguousarray(rgba)) return faces
[docs] def equirect_to_cubemap(equirect: np.ndarray, face_size: int = 256) -> list[np.ndarray]: """Project an equirectangular ``(H, W, 3)`` HDR image onto 6 cube faces. Returns a list of 6 ``(face_size, face_size, 4)`` float32 RGBA arrays in Vulkan cubemap order ``[+X, -X, +Y, -Y, +Z, -Z]``. Sampling is bilinear in the source image; corner pixels stay numerically stable because the direction vectors are normalised before projection. Pure-numpy: vectorised over every pixel of every face for sub-second turnaround at 256² faces. """ if equirect.ndim != 3 or equirect.shape[2] not in (3, 4): raise ValueError(f"Expected (H, W, 3|4) equirect; got {equirect.shape}") src_h, src_w = equirect.shape[:2] src = equirect[..., :3].astype(np.float32) # Generate per-pixel direction vectors for each face. (u, v) ranges # [-1, 1] across the face; we map them to the face's local axes, then to # world directions, then to spherical coordinates for the equirect lookup. a = np.linspace(-1.0, 1.0, face_size, dtype=np.float32) b = np.linspace(-1.0, 1.0, face_size, dtype=np.float32) u, v = np.meshgrid(a, b) # u, v: (face_size, face_size) # Each face: world direction = u * right + v * up + 1 * forward. one = np.ones_like(u) face_dirs = [ ( one, -v, -u), # +X (right of cube), Vulkan/OpenGL convention (-one, -v, u), # -X ( u, one, v), # +Y ( u,-one, -v), # -Y ( u, -v, one), # +Z ( -u, -v,-one), # -Z ] faces: list[np.ndarray] = [] for fx, fy, fz in face_dirs: x = fx y = fy z = fz norm = np.sqrt(x * x + y * y + z * z) x, y, z = x / norm, y / norm, z / norm # Spherical: phi = atan2(z, x) in [-pi, pi]; theta = acos(y) in [0, pi]. phi = np.arctan2(z, x) theta = np.arccos(np.clip(y, -1.0, 1.0)) # Equirect UVs: u = (phi + pi) / (2 pi); v = theta / pi. u_eq = (phi + np.pi) / (2.0 * np.pi) v_eq = theta / np.pi # Pixel coords with bilinear sampling. fx_pix = u_eq * (src_w - 1) fy_pix = v_eq * (src_h - 1) x0 = np.floor(fx_pix).astype(np.int32) y0 = np.floor(fy_pix).astype(np.int32) x1 = np.minimum(x0 + 1, src_w - 1) y1 = np.minimum(y0 + 1, src_h - 1) wx = (fx_pix - x0)[..., None] wy = (fy_pix - y0)[..., None] c00 = src[y0, x0] c10 = src[y0, x1] c01 = src[y1, x0] c11 = src[y1, x1] sampled = (c00 * (1 - wx) + c10 * wx) * (1 - wy) + (c01 * (1 - wx) + c11 * wx) * wy # RGBA float32 (alpha 1): matches the format Vulkan uploads expect. rgba = np.empty((face_size, face_size, 4), dtype=np.float32) rgba[..., :3] = sampled rgba[..., 3] = 1.0 faces.append(np.ascontiguousarray(rgba)) return faces
def _load_equirect_hdr(path: str | Path, face_size: int = 256) -> tuple[list[bytes], int, int]: """Decode an .hdr equirect file and project it onto a 6-face cubemap. Returns ``(faces_bytes, face_size, face_size)`` in Vulkan order so it drops straight into the existing GPU-upload path. """ data = Path(path).read_bytes() equirect = decode_rgbe(data) faces = equirect_to_cubemap(equirect, face_size=face_size) return [f.tobytes() for f in faces], face_size, face_size
[docs] def load_cubemap( device: Any, physical_device: Any, queue: Any, cmd_pool: Any, face_paths: list[str] | None = None, hdr_path: str | None = None, colour: tuple[float, float, float] | None = None, face_size: int = 256, faces: list[np.ndarray] | None = None, ) -> tuple[Any, Any, Any, Any]: """Load a cubemap from 6 face images, an equirect HDR, or a solid colour. Args: face_paths: List of 6 image paths [+X, -X, +Y, -Y, +Z, -Z]. hdr_path: Single equirectangular .hdr file (Radiance RGBE). Projected onto a 6-face cubemap on the CPU; ``face_size`` controls the per-face resolution (default 256). colour: Solid colour (r, g, b) in 0-1 range (fallback if no paths). face_size: Resolution of each cube face when projecting from an HDR equirect. Ignored for explicit ``face_paths``. Returns: (image_view, sampler, image, memory) tuple for the cubemap. """ ffi = vk.ffi if faces is not None: # Pre-synthesized float32 RGBA faces (e.g. gradient_cubemap_faces). if len(faces) != 6: raise ValueError(f"cubemap requires exactly 6 faces, got {len(faces)}") width, height = faces[0].shape[1], faces[0].shape[0] faces = [np.ascontiguousarray(f, dtype=np.float32).tobytes() for f in faces] elif face_paths and len(face_paths) == 6: faces, width, height = _load_face_images(face_paths) elif hdr_path: faces, width, height = _load_equirect_hdr(hdr_path, face_size=face_size) else: # Solid colour fallback c = colour or (0.2, 0.3, 0.5) width, height = 64, 64 pixel = np.array([c[0], c[1], c[2], 1.0], dtype=np.float32) face_data = np.tile(pixel, width * height).astype(np.float32) faces = [face_data.tobytes() for _ in range(6)] fmt = vk.VK_FORMAT_R32G32B32A32_SFLOAT pixel_size = 16 # 4 floats * 4 bytes # Create cubemap image image_ci = ffi.new("VkImageCreateInfo*") image_ci.sType = vk.VK_STRUCTURE_TYPE_IMAGE_CREATE_INFO image_ci.imageType = vk.VK_IMAGE_TYPE_2D image_ci.format = fmt image_ci.extent.width = width image_ci.extent.height = height image_ci.extent.depth = 1 image_ci.mipLevels = 1 image_ci.arrayLayers = 6 image_ci.samples = vk.VK_SAMPLE_COUNT_1_BIT image_ci.tiling = vk.VK_IMAGE_TILING_OPTIMAL image_ci.usage = vk.VK_IMAGE_USAGE_TRANSFER_DST_BIT | vk.VK_IMAGE_USAGE_SAMPLED_BIT image_ci.sharingMode = vk.VK_SHARING_MODE_EXCLUSIVE image_ci.initialLayout = vk.VK_IMAGE_LAYOUT_UNDEFINED image_ci.flags = vk.VK_IMAGE_CREATE_CUBE_COMPATIBLE_BIT image_out = ffi.new("VkImage*") result = vk._vulkan._callApi( vk._vulkan.lib.vkCreateImage, device, image_ci, ffi.NULL, image_out, ) if result != vk.VK_SUCCESS: raise RuntimeError(f"vkCreateImage failed: {result}") image = image_out[0] # Allocate and bind memory mem_req = vk.vkGetImageMemoryRequirements(device, image) from ..gpu.memory import _find_memory_type mem_type = _find_memory_type(physical_device, mem_req.memoryTypeBits, vk.VK_MEMORY_PROPERTY_DEVICE_LOCAL_BIT) alloc_info = vk.VkMemoryAllocateInfo( allocationSize=mem_req.size, memoryTypeIndex=mem_type, ) memory = vk.vkAllocateMemory(device, alloc_info, None) vk.vkBindImageMemory(device, image, memory, 0) # Upload face data via staging buffer face_size = width * height * pixel_size from ..gpu.memory import create_buffer, upload_numpy staging_buf, staging_mem = create_buffer( device, physical_device, face_size, vk.VK_BUFFER_USAGE_TRANSFER_SRC_BIT, vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT, ) # Transition to TRANSFER_DST, copy each face, transition to SHADER_READ cmd_ai = vk.VkCommandBufferAllocateInfo( commandPool=cmd_pool, level=vk.VK_COMMAND_BUFFER_LEVEL_PRIMARY, commandBufferCount=1, ) cmds = vk.vkAllocateCommandBuffers(device, cmd_ai) cmd = cmds[0] vk.vkBeginCommandBuffer( cmd, vk.VkCommandBufferBeginInfo( flags=vk.VK_COMMAND_BUFFER_USAGE_ONE_TIME_SUBMIT_BIT, ), ) # Transition entire cubemap to TRANSFER_DST barrier = vk.VkImageMemoryBarrier( srcAccessMask=0, dstAccessMask=vk.VK_ACCESS_TRANSFER_WRITE_BIT, oldLayout=vk.VK_IMAGE_LAYOUT_UNDEFINED, newLayout=vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL, image=image, subresourceRange=vk.VkImageSubresourceRange( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=6, ), ) vk.vkCmdPipelineBarrier( cmd, vk.VK_PIPELINE_STAGE_TOP_OF_PIPE_BIT, vk.VK_PIPELINE_STAGE_TRANSFER_BIT, 0, 0, None, 0, None, 1, [barrier] ) for face_idx in range(6): # Upload face to staging face_bytes = faces[face_idx] if isinstance(face_bytes, np.ndarray): face_bytes = face_bytes.tobytes() upload_numpy(device, staging_mem, np.frombuffer(face_bytes, dtype=np.uint8)) # Copy staging → cubemap face region = vk.VkBufferImageCopy( bufferOffset=0, bufferRowLength=0, bufferImageHeight=0, imageSubresource=vk.VkImageSubresourceLayers( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, mipLevel=0, baseArrayLayer=face_idx, layerCount=1, ), imageOffset=vk.VkOffset3D(x=0, y=0, z=0), imageExtent=vk.VkExtent3D(width=width, height=height, depth=1), ) vk.vkCmdCopyBufferToImage(cmd, staging_buf, image, vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL, 1, [region]) # Transition to SHADER_READ barrier2 = vk.VkImageMemoryBarrier( srcAccessMask=vk.VK_ACCESS_TRANSFER_WRITE_BIT, dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT, oldLayout=vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL, newLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL, image=image, subresourceRange=vk.VkImageSubresourceRange( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=6, ), ) vk.vkCmdPipelineBarrier( cmd, vk.VK_PIPELINE_STAGE_TRANSFER_BIT, vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT, 0, 0, None, 0, None, 1, [barrier2], ) vk.vkEndCommandBuffer(cmd) vk.vkQueueSubmit( queue, 1, [ vk.VkSubmitInfo( commandBufferCount=1, pCommandBuffers=[cmd], ) ], None, ) vk.vkQueueWaitIdle(queue) vk.vkFreeCommandBuffers(device, cmd_pool, 1, [cmd]) # Cleanup staging vk.vkDestroyBuffer(device, staging_buf, None) vk.vkFreeMemory(device, staging_mem, None) # Create cubemap image view view = vk.vkCreateImageView( device, vk.VkImageViewCreateInfo( image=image, viewType=vk.VK_IMAGE_VIEW_TYPE_CUBE, format=fmt, subresourceRange=vk.VkImageSubresourceRange( aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT, baseMipLevel=0, levelCount=1, baseArrayLayer=0, layerCount=6, ), ), None, ) sampler = create_sampler(device) log.debug("Cubemap loaded (%dx%d)", width, height) return view, sampler, image, memory
def _load_face_images(paths: list[str]) -> tuple[list[bytes], int, int]: """Load 6 face images and return (face_data_list, width, height).""" try: from PIL import Image except ImportError: raise ImportError("Pillow is required for cubemap face loading: pip install Pillow") from None faces = [] width = height = 0 for path in paths: img = Image.open(path).convert("RGBA") if width == 0: width, height = img.size else: img = img.resize((width, height)) # Convert to float32 RGBA data = np.array(img, dtype=np.float32) / 255.0 faces.append(data.tobytes()) return faces, width, height