"""Load cubemap textures from 6 face images or equirectangular HDR."""
import logging
from pathlib import Path
from typing import Any
import numpy as np
import vulkan as vk
from ..gpu.memory import create_sampler
__all__ = ["load_cubemap", "decode_rgbe", "equirect_to_cubemap", "gradient_cubemap_faces"]
log = logging.getLogger(__name__)
[docs]
def decode_rgbe(data: bytes) -> np.ndarray:
"""Decode a Radiance RGBE / .hdr image to a float32 ``(H, W, 3)`` array.
Supports both uncompressed (FORMAT=32-bit_rle_rgbe) headers and the
standard run-length-encoded scanline form (the only forms produced by
every common HDR exporter: Lightroom, Substance, sIBL Archive). No
Pillow / imageio / OpenEXR dependency.
Raises ``ValueError`` if ``data`` is not a recognisable RGBE stream.
"""
pos = 0
n = len(data)
if not data.startswith(b"#?"):
raise ValueError("Not an RGBE/Radiance HDR file (missing #? magic)")
# Parse ASCII header: terminated by a blank line.
header_end = data.find(b"\n\n", pos)
if header_end < 0:
raise ValueError("RGBE header is missing the blank-line terminator")
header = data[:header_end].decode("ascii", errors="replace")
fmt_ok = "FORMAT=32-bit_rle_rgbe" in header or "FORMAT=32-bit_rle_xyze" in header
if not fmt_ok:
# Some files don't declare FORMAT; tolerate that: only XYZE truly
# needs different handling and is exotic enough to flag.
if "FORMAT=" in header and "32-bit_rle_rgbe" not in header:
raise ValueError("Unsupported RGBE FORMAT (only 32-bit_rle_rgbe is supported)")
pos = header_end + 2 # skip the blank line
# Resolution string: e.g. ``-Y 512 +X 1024\n`` (Y first = top-down).
res_end = data.find(b"\n", pos)
if res_end < 0:
raise ValueError("RGBE resolution string missing newline")
res_line = data[pos:res_end].decode("ascii").strip().split()
pos = res_end + 1
if len(res_line) != 4:
raise ValueError(f"Unexpected RGBE resolution line: {res_line!r}")
# Standard form: ``-Y H +X W``. Other orientations exist but are rare.
flip_y = res_line[0] == "+Y" # default -Y means top-down (no flip)
flip_x = res_line[2] == "-X"
height = int(res_line[1])
width = int(res_line[3])
out = np.zeros((height, width, 4), dtype=np.uint8)
for y in range(height):
if pos + 4 > n:
raise ValueError(f"RGBE truncated at scanline {y}")
b0, b1, b2, b3 = data[pos], data[pos + 1], data[pos + 2], data[pos + 3]
# New RLE marker: 0x02 0x02 followed by hi/lo of width (must match).
if b0 == 2 and b1 == 2 and ((b2 << 8) | b3) == width and width >= 8 and width <= 0x7fff:
pos += 4
scanline = np.empty((4, width), dtype=np.uint8)
for ch in range(4):
x = 0
while x < width:
if pos >= n:
raise ValueError("RGBE truncated mid-scanline")
count = data[pos]
pos += 1
if count > 128:
# Run of identical bytes
run_len = count - 128
if pos >= n or x + run_len > width:
raise ValueError("RGBE run-length exceeds scanline width")
scanline[ch, x:x + run_len] = data[pos]
pos += 1
x += run_len
else:
# Literal block
if pos + count > n or x + count > width:
raise ValueError("RGBE literal block exceeds scanline width")
scanline[ch, x:x + count] = np.frombuffer(
data, dtype=np.uint8, count=count, offset=pos,
)
pos += count
x += count
out[y, :, 0] = scanline[0]
out[y, :, 1] = scanline[1]
out[y, :, 2] = scanline[2]
out[y, :, 3] = scanline[3]
else:
# Old-style uncompressed scanline: width * 4 bytes back-to-back.
if pos + width * 4 > n:
raise ValueError(f"RGBE truncated at uncompressed scanline {y}")
row = np.frombuffer(data, dtype=np.uint8, count=width * 4, offset=pos).reshape(width, 4)
out[y] = row
pos += width * 4
if flip_y:
out = out[::-1]
if flip_x:
out = out[:, ::-1]
# Decode RGBE → linear float32 RGB.
rgbe = out.astype(np.float32)
e = rgbe[..., 3]
# exponent 0 → black (avoid division/exp overflow paths)
mantissa_scale = np.where(e > 0, np.ldexp(1.0 / 256.0, (e - 128).astype(np.int32)), 0.0)
rgb = rgbe[..., :3] * mantissa_scale[..., None]
return rgb.astype(np.float32)
[docs]
def gradient_cubemap_faces(
top_rgb: tuple[float, float, float],
bottom_rgb: tuple[float, float, float],
size: int = 64,
) -> list[np.ndarray]:
"""Synthesize 6 cube faces with a vertical gradient from ``top`` to ``bottom``.
Returns 6 ``(size, size, 4)`` float32 RGBA arrays in Vulkan order
``[+X, -X, +Y, -Y, +Z, -Z]``, values in ``[0, 1]``. The gradient is sampled
by each texel's world-space direction (its ``y`` component) so the horizon
ring is continuous across all faces: texel-row gradients kink at the seams.
Math mirrors the web ``_gradient_cubemap_faces`` (``web/renderer/web.py``)
exactly so both backends produce an identical gradient sky / IBL ambient.
The two are kept as parallel impls because the web renderer runs in Pyodide
and cannot import this module (``cubemap_loader`` imports ``vulkan``); a
future single-source would move the pure-NumPy core into ``simvx.core``.
"""
t = np.asarray(top_rgb, dtype=np.float32)[:3]
b = np.asarray(bottom_rgb, dtype=np.float32)[:3]
grid = (np.arange(size, dtype=np.float32) + 0.5) / size * 2.0 - 1.0
sx, sy = np.meshgrid(grid, grid)
one = np.ones_like(sx)
face_dirs = [
np.stack([one, -sy, -sx], axis=-1), # +X
np.stack([-one, -sy, sx], axis=-1), # -X
np.stack([sx, one, sy], axis=-1), # +Y
np.stack([sx, -one, -sy], axis=-1), # -Y
np.stack([sx, -sy, one], axis=-1), # +Z
np.stack([-sx, -sy, -one], axis=-1), # -Z
]
faces: list[np.ndarray] = []
for d in face_dirs:
d = d / np.linalg.norm(d, axis=-1, keepdims=True)
f = ((1.0 - d[..., 1]) * 0.5)[..., None] # 0 at +Y pole, 1 at -Y pole
c = t * (1.0 - f) + b * f
rgba = np.empty((size, size, 4), dtype=np.float32)
rgba[..., :3] = c
rgba[..., 3] = 1.0
faces.append(np.ascontiguousarray(rgba))
return faces
[docs]
def equirect_to_cubemap(equirect: np.ndarray, face_size: int = 256) -> list[np.ndarray]:
"""Project an equirectangular ``(H, W, 3)`` HDR image onto 6 cube faces.
Returns a list of 6 ``(face_size, face_size, 4)`` float32 RGBA arrays in
Vulkan cubemap order ``[+X, -X, +Y, -Y, +Z, -Z]``. Sampling is bilinear in
the source image; corner pixels stay numerically stable because the
direction vectors are normalised before projection. Pure-numpy: vectorised
over every pixel of every face for sub-second turnaround at 256² faces.
"""
if equirect.ndim != 3 or equirect.shape[2] not in (3, 4):
raise ValueError(f"Expected (H, W, 3|4) equirect; got {equirect.shape}")
src_h, src_w = equirect.shape[:2]
src = equirect[..., :3].astype(np.float32)
# Generate per-pixel direction vectors for each face. (u, v) ranges
# [-1, 1] across the face; we map them to the face's local axes, then to
# world directions, then to spherical coordinates for the equirect lookup.
a = np.linspace(-1.0, 1.0, face_size, dtype=np.float32)
b = np.linspace(-1.0, 1.0, face_size, dtype=np.float32)
u, v = np.meshgrid(a, b) # u, v: (face_size, face_size)
# Each face: world direction = u * right + v * up + 1 * forward.
one = np.ones_like(u)
face_dirs = [
( one, -v, -u), # +X (right of cube), Vulkan/OpenGL convention
(-one, -v, u), # -X
( u, one, v), # +Y
( u,-one, -v), # -Y
( u, -v, one), # +Z
( -u, -v,-one), # -Z
]
faces: list[np.ndarray] = []
for fx, fy, fz in face_dirs:
x = fx
y = fy
z = fz
norm = np.sqrt(x * x + y * y + z * z)
x, y, z = x / norm, y / norm, z / norm
# Spherical: phi = atan2(z, x) in [-pi, pi]; theta = acos(y) in [0, pi].
phi = np.arctan2(z, x)
theta = np.arccos(np.clip(y, -1.0, 1.0))
# Equirect UVs: u = (phi + pi) / (2 pi); v = theta / pi.
u_eq = (phi + np.pi) / (2.0 * np.pi)
v_eq = theta / np.pi
# Pixel coords with bilinear sampling.
fx_pix = u_eq * (src_w - 1)
fy_pix = v_eq * (src_h - 1)
x0 = np.floor(fx_pix).astype(np.int32)
y0 = np.floor(fy_pix).astype(np.int32)
x1 = np.minimum(x0 + 1, src_w - 1)
y1 = np.minimum(y0 + 1, src_h - 1)
wx = (fx_pix - x0)[..., None]
wy = (fy_pix - y0)[..., None]
c00 = src[y0, x0]
c10 = src[y0, x1]
c01 = src[y1, x0]
c11 = src[y1, x1]
sampled = (c00 * (1 - wx) + c10 * wx) * (1 - wy) + (c01 * (1 - wx) + c11 * wx) * wy
# RGBA float32 (alpha 1): matches the format Vulkan uploads expect.
rgba = np.empty((face_size, face_size, 4), dtype=np.float32)
rgba[..., :3] = sampled
rgba[..., 3] = 1.0
faces.append(np.ascontiguousarray(rgba))
return faces
def _load_equirect_hdr(path: str | Path, face_size: int = 256) -> tuple[list[bytes], int, int]:
"""Decode an .hdr equirect file and project it onto a 6-face cubemap.
Returns ``(faces_bytes, face_size, face_size)`` in Vulkan order so it
drops straight into the existing GPU-upload path.
"""
data = Path(path).read_bytes()
equirect = decode_rgbe(data)
faces = equirect_to_cubemap(equirect, face_size=face_size)
return [f.tobytes() for f in faces], face_size, face_size
[docs]
def load_cubemap(
device: Any,
physical_device: Any,
queue: Any,
cmd_pool: Any,
face_paths: list[str] | None = None,
hdr_path: str | None = None,
colour: tuple[float, float, float] | None = None,
face_size: int = 256,
faces: list[np.ndarray] | None = None,
) -> tuple[Any, Any, Any, Any]:
"""Load a cubemap from 6 face images, an equirect HDR, or a solid colour.
Args:
face_paths: List of 6 image paths [+X, -X, +Y, -Y, +Z, -Z].
hdr_path: Single equirectangular .hdr file (Radiance RGBE). Projected
onto a 6-face cubemap on the CPU; ``face_size`` controls the
per-face resolution (default 256).
colour: Solid colour (r, g, b) in 0-1 range (fallback if no paths).
face_size: Resolution of each cube face when projecting from an HDR
equirect. Ignored for explicit ``face_paths``.
Returns:
(image_view, sampler, image, memory) tuple for the cubemap.
"""
ffi = vk.ffi
if faces is not None:
# Pre-synthesized float32 RGBA faces (e.g. gradient_cubemap_faces).
if len(faces) != 6:
raise ValueError(f"cubemap requires exactly 6 faces, got {len(faces)}")
width, height = faces[0].shape[1], faces[0].shape[0]
faces = [np.ascontiguousarray(f, dtype=np.float32).tobytes() for f in faces]
elif face_paths and len(face_paths) == 6:
faces, width, height = _load_face_images(face_paths)
elif hdr_path:
faces, width, height = _load_equirect_hdr(hdr_path, face_size=face_size)
else:
# Solid colour fallback
c = colour or (0.2, 0.3, 0.5)
width, height = 64, 64
pixel = np.array([c[0], c[1], c[2], 1.0], dtype=np.float32)
face_data = np.tile(pixel, width * height).astype(np.float32)
faces = [face_data.tobytes() for _ in range(6)]
fmt = vk.VK_FORMAT_R32G32B32A32_SFLOAT
pixel_size = 16 # 4 floats * 4 bytes
# Create cubemap image
image_ci = ffi.new("VkImageCreateInfo*")
image_ci.sType = vk.VK_STRUCTURE_TYPE_IMAGE_CREATE_INFO
image_ci.imageType = vk.VK_IMAGE_TYPE_2D
image_ci.format = fmt
image_ci.extent.width = width
image_ci.extent.height = height
image_ci.extent.depth = 1
image_ci.mipLevels = 1
image_ci.arrayLayers = 6
image_ci.samples = vk.VK_SAMPLE_COUNT_1_BIT
image_ci.tiling = vk.VK_IMAGE_TILING_OPTIMAL
image_ci.usage = vk.VK_IMAGE_USAGE_TRANSFER_DST_BIT | vk.VK_IMAGE_USAGE_SAMPLED_BIT
image_ci.sharingMode = vk.VK_SHARING_MODE_EXCLUSIVE
image_ci.initialLayout = vk.VK_IMAGE_LAYOUT_UNDEFINED
image_ci.flags = vk.VK_IMAGE_CREATE_CUBE_COMPATIBLE_BIT
image_out = ffi.new("VkImage*")
result = vk._vulkan._callApi(
vk._vulkan.lib.vkCreateImage,
device,
image_ci,
ffi.NULL,
image_out,
)
if result != vk.VK_SUCCESS:
raise RuntimeError(f"vkCreateImage failed: {result}")
image = image_out[0]
# Allocate and bind memory
mem_req = vk.vkGetImageMemoryRequirements(device, image)
from ..gpu.memory import _find_memory_type
mem_type = _find_memory_type(physical_device, mem_req.memoryTypeBits, vk.VK_MEMORY_PROPERTY_DEVICE_LOCAL_BIT)
alloc_info = vk.VkMemoryAllocateInfo(
allocationSize=mem_req.size,
memoryTypeIndex=mem_type,
)
memory = vk.vkAllocateMemory(device, alloc_info, None)
vk.vkBindImageMemory(device, image, memory, 0)
# Upload face data via staging buffer
face_size = width * height * pixel_size
from ..gpu.memory import create_buffer, upload_numpy
staging_buf, staging_mem = create_buffer(
device,
physical_device,
face_size,
vk.VK_BUFFER_USAGE_TRANSFER_SRC_BIT,
vk.VK_MEMORY_PROPERTY_HOST_VISIBLE_BIT | vk.VK_MEMORY_PROPERTY_HOST_COHERENT_BIT,
)
# Transition to TRANSFER_DST, copy each face, transition to SHADER_READ
cmd_ai = vk.VkCommandBufferAllocateInfo(
commandPool=cmd_pool,
level=vk.VK_COMMAND_BUFFER_LEVEL_PRIMARY,
commandBufferCount=1,
)
cmds = vk.vkAllocateCommandBuffers(device, cmd_ai)
cmd = cmds[0]
vk.vkBeginCommandBuffer(
cmd,
vk.VkCommandBufferBeginInfo(
flags=vk.VK_COMMAND_BUFFER_USAGE_ONE_TIME_SUBMIT_BIT,
),
)
# Transition entire cubemap to TRANSFER_DST
barrier = vk.VkImageMemoryBarrier(
srcAccessMask=0,
dstAccessMask=vk.VK_ACCESS_TRANSFER_WRITE_BIT,
oldLayout=vk.VK_IMAGE_LAYOUT_UNDEFINED,
newLayout=vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL,
image=image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0,
levelCount=1,
baseArrayLayer=0,
layerCount=6,
),
)
vk.vkCmdPipelineBarrier(
cmd, vk.VK_PIPELINE_STAGE_TOP_OF_PIPE_BIT, vk.VK_PIPELINE_STAGE_TRANSFER_BIT, 0, 0, None, 0, None, 1, [barrier]
)
for face_idx in range(6):
# Upload face to staging
face_bytes = faces[face_idx]
if isinstance(face_bytes, np.ndarray):
face_bytes = face_bytes.tobytes()
upload_numpy(device, staging_mem, np.frombuffer(face_bytes, dtype=np.uint8))
# Copy staging → cubemap face
region = vk.VkBufferImageCopy(
bufferOffset=0,
bufferRowLength=0,
bufferImageHeight=0,
imageSubresource=vk.VkImageSubresourceLayers(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
mipLevel=0,
baseArrayLayer=face_idx,
layerCount=1,
),
imageOffset=vk.VkOffset3D(x=0, y=0, z=0),
imageExtent=vk.VkExtent3D(width=width, height=height, depth=1),
)
vk.vkCmdCopyBufferToImage(cmd, staging_buf, image, vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL, 1, [region])
# Transition to SHADER_READ
barrier2 = vk.VkImageMemoryBarrier(
srcAccessMask=vk.VK_ACCESS_TRANSFER_WRITE_BIT,
dstAccessMask=vk.VK_ACCESS_SHADER_READ_BIT,
oldLayout=vk.VK_IMAGE_LAYOUT_TRANSFER_DST_OPTIMAL,
newLayout=vk.VK_IMAGE_LAYOUT_SHADER_READ_ONLY_OPTIMAL,
image=image,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0,
levelCount=1,
baseArrayLayer=0,
layerCount=6,
),
)
vk.vkCmdPipelineBarrier(
cmd,
vk.VK_PIPELINE_STAGE_TRANSFER_BIT,
vk.VK_PIPELINE_STAGE_FRAGMENT_SHADER_BIT,
0,
0,
None,
0,
None,
1,
[barrier2],
)
vk.vkEndCommandBuffer(cmd)
vk.vkQueueSubmit(
queue,
1,
[
vk.VkSubmitInfo(
commandBufferCount=1,
pCommandBuffers=[cmd],
)
],
None,
)
vk.vkQueueWaitIdle(queue)
vk.vkFreeCommandBuffers(device, cmd_pool, 1, [cmd])
# Cleanup staging
vk.vkDestroyBuffer(device, staging_buf, None)
vk.vkFreeMemory(device, staging_mem, None)
# Create cubemap image view
view = vk.vkCreateImageView(
device,
vk.VkImageViewCreateInfo(
image=image,
viewType=vk.VK_IMAGE_VIEW_TYPE_CUBE,
format=fmt,
subresourceRange=vk.VkImageSubresourceRange(
aspectMask=vk.VK_IMAGE_ASPECT_COLOR_BIT,
baseMipLevel=0,
levelCount=1,
baseArrayLayer=0,
layerCount=6,
),
),
None,
)
sampler = create_sampler(device)
log.debug("Cubemap loaded (%dx%d)", width, height)
return view, sampler, image, memory
def _load_face_images(paths: list[str]) -> tuple[list[bytes], int, int]:
"""Load 6 face images and return (face_data_list, width, height)."""
try:
from PIL import Image
except ImportError:
raise ImportError("Pillow is required for cubemap face loading: pip install Pillow") from None
faces = []
width = height = 0
for path in paths:
img = Image.open(path).convert("RGBA")
if width == 0:
width, height = img.size
else:
img = img.resize((width, height))
# Convert to float32 RGBA
data = np.array(img, dtype=np.float32) / 255.0
faces.append(data.tobytes())
return faces, width, height