"""Binary serialization of 3D render state for WebSocket streaming.
Serializes a complete 3D scene frame into compact binary for transmission
to a browser-side WebGPU renderer.
Wire format per frame::
HEADER (20 bytes):
frame_id(u32) flags(u32) viewport_count(u32) light_count(u32) draw_group_count(u32)
Per VIEWPORT (136 bytes):
x(u32) y(u32) w(u32) h(u32) + view_mat(16×f32) + proj_mat(16×f32)
RESOURCES (only if flags bit 0 set):
mesh_count(u32) + texture_count(u32) + material_count(u32)
Per mesh: mesh_id(u32) vertex_count(u32) index_count(u32)
vertex_bytes(vertex_count × 32)
index_bytes(index_count × 4)
Per texture: tex_id(u32) width(u32) height(u32) pixel_bytes(width × height × 4)
material_bytes(material_count × MATERIAL_DTYPE.itemsize)
LIGHTS (light_count × LIGHT_DTYPE.itemsize bytes):
Raw LIGHT_DTYPE data
DRAW GROUPS (draw_group_count entries):
Per group: mesh_id(u32) index_count(u32) instance_count(u32) pass_type(u32)
transform_bytes(instance_count × 64) -- 4×4 model matrix per instance, f32
material_ids(instance_count × u32)
pass_type: 0=OPAQUE, 1=DOUBLE_SIDED, 2=TRANSPARENT
POST-PROCESS (only if flags bit 1 set, 16 bytes):
bloom_enabled(u32) bloom_threshold(f32) bloom_intensity(f32) bloom_soft_knee(f32)
"""
from __future__ import annotations
import struct
from typing import Any
import numpy as np
from .._types import LIGHT_DTYPE, MATERIAL_DTYPE, VERTEX_DTYPE
__all__ = ["Scene3DSerializer"]
FLAG_HAS_RESOURCES = 1 << 0
FLAG_HAS_POST_PROCESS = 1 << 1
# Header: frame_id(u32) + flags(u32) + viewport_count(u32) + light_count(u32) + draw_group_count(u32)
_HEADER = struct.Struct("<IIIII")
# Viewport rect: x(u32) + y(u32) + w(u32) + h(u32)
_VP_RECT = struct.Struct("<IIII")
# Resource counts: mesh_count(u32) + texture_count(u32) + material_count(u32)
_RES_COUNTS = struct.Struct("<III")
# Mesh header: mesh_id(u32) + vertex_count(u32) + index_count(u32)
_MESH_HEADER = struct.Struct("<III")
# Texture header: tex_id(u32) + width(u32) + height(u32)
_TEX_HEADER = struct.Struct("<III")
# Draw group header: mesh_id(u32) + index_count(u32) + instance_count(u32) + pass_type(u32)
_GROUP_HEADER = struct.Struct("<IIII")
# Post-process: bloom_enabled(u32) + bloom_threshold(f32) + bloom_intensity(f32) + bloom_soft_knee(f32)
_POST_PROCESS = struct.Struct("<Ifff")
_MAT4_BYTES = 64 # 16 × f32
[docs]
class Scene3DSerializer:
"""Serializes 3D scene state into binary frames for WebSocket streaming."""
[docs]
@staticmethod
def serialize_frame(
frame_id: int,
viewports: list[dict[str, Any]],
lights: np.ndarray,
draw_groups: list[dict[str, Any]],
resources: dict[str, Any] | None = None,
post_process: dict[str, Any] | None = None,
) -> bytes:
"""Serialize a 3D scene frame into compact binary.
Args:
frame_id: Monotonic frame counter.
viewports: List of dicts with keys ``x, y, width, height, view_matrix, proj_matrix``
(matrices are 4x4 float32 arrays).
lights: Structured numpy array with dtype ``LIGHT_DTYPE``.
draw_groups: List of dicts with keys ``mesh_id, index_count, transforms, material_ids``
where transforms is (N, 4, 4) float32 and material_ids is (N,) uint32.
resources: Optional dict with keys ``meshes, textures, materials`` for initial upload.
meshes: list of (mesh_id, vertices, indices).
textures: list of (tex_id, width, height, pixels).
materials: structured numpy array with dtype ``MATERIAL_DTYPE``.
post_process: Optional dict with bloom settings:
bloom_enabled (bool), bloom_threshold (float),
bloom_intensity (float), bloom_soft_knee (float).
Returns:
Compact binary frame ready for WebSocket transmission.
"""
flags = FLAG_HAS_RESOURCES if resources is not None else 0
if post_process is not None:
flags |= FLAG_HAS_POST_PROCESS
light_count = len(lights) if lights is not None and len(lights) > 0 else 0
parts: list[bytes] = [_HEADER.pack(frame_id, flags, len(viewports), light_count, len(draw_groups))]
# Viewports
for vp in viewports:
parts.append(_VP_RECT.pack(vp["x"], vp["y"], vp["width"], vp["height"]))
parts.append(np.asarray(vp["view_matrix"], dtype=np.float32).tobytes())
parts.append(np.asarray(vp["proj_matrix"], dtype=np.float32).tobytes())
# Resources (optional)
if resources is not None:
meshes = resources.get("meshes", [])
textures = resources.get("textures", [])
materials = resources.get("materials", np.empty(0, dtype=MATERIAL_DTYPE))
parts.append(_RES_COUNTS.pack(len(meshes), len(textures), len(materials)))
for mesh_id, vertices, indices in meshes:
verts = np.asarray(vertices, dtype=VERTEX_DTYPE)
idxs = np.asarray(indices, dtype=np.uint32)
parts.append(_MESH_HEADER.pack(mesh_id, len(verts), len(idxs)))
parts.append(verts.tobytes())
parts.append(idxs.tobytes())
for tex_id, width, height, pixels in textures:
parts.append(_TEX_HEADER.pack(tex_id, width, height))
parts.append(np.asarray(pixels, dtype=np.uint8).tobytes())
parts.append(np.asarray(materials, dtype=MATERIAL_DTYPE).tobytes())
# Lights
if light_count > 0:
parts.append(np.asarray(lights, dtype=LIGHT_DTYPE).tobytes())
# Draw groups
for group in draw_groups:
transforms = np.asarray(group["transforms"], dtype=np.float32)
mat_ids = np.asarray(group["material_ids"], dtype=np.uint32)
instance_count = len(mat_ids)
parts.append(_GROUP_HEADER.pack(group["mesh_id"], group["index_count"], instance_count, group.get("pass_type", 0)))
parts.append(transforms.tobytes())
parts.append(mat_ids.tobytes())
# Post-process (16 bytes, after draw groups)
if post_process is not None:
parts.append(_POST_PROCESS.pack(
1 if post_process.get("bloom_enabled", False) else 0,
float(post_process.get("bloom_threshold", 1.0)),
float(post_process.get("bloom_intensity", 0.8)),
float(post_process.get("bloom_soft_knee", 0.5)),
))
return b"".join(parts)
[docs]
@staticmethod
def deserialize_frame(data: bytes) -> dict[str, Any]:
"""Deserialize a binary frame back into structured data (for testing).
Returns:
Dict with keys: frame_id, flags, viewports, lights, draw_groups, resources (optional).
"""
off = 0
frame_id, flags, vp_count, light_count, group_count = _HEADER.unpack_from(data, off)
off += _HEADER.size
# Viewports
viewports = []
for _ in range(vp_count):
x, y, w, h = _VP_RECT.unpack_from(data, off)
off += _VP_RECT.size
view_mat = np.frombuffer(data[off : off + _MAT4_BYTES], dtype=np.float32).reshape(4, 4).copy()
off += _MAT4_BYTES
proj_mat = np.frombuffer(data[off : off + _MAT4_BYTES], dtype=np.float32).reshape(4, 4).copy()
off += _MAT4_BYTES
viewports.append({"x": x, "y": y, "width": w, "height": h, "view_matrix": view_mat, "proj_matrix": proj_mat})
# Resources
resources = None
if flags & FLAG_HAS_RESOURCES:
mesh_count, tex_count, mat_count = _RES_COUNTS.unpack_from(data, off)
off += _RES_COUNTS.size
meshes = []
for _ in range(mesh_count):
mesh_id, vert_count, idx_count = _MESH_HEADER.unpack_from(data, off)
off += _MESH_HEADER.size
vb = vert_count * VERTEX_DTYPE.itemsize
verts = np.frombuffer(data[off : off + vb], dtype=VERTEX_DTYPE).copy()
off += vb
ib = idx_count * 4
idxs = np.frombuffer(data[off : off + ib], dtype=np.uint32).copy()
off += ib
meshes.append((mesh_id, verts, idxs))
textures = []
for _ in range(tex_count):
tex_id, tw, th = _TEX_HEADER.unpack_from(data, off)
off += _TEX_HEADER.size
pb = tw * th * 4
pixels = np.frombuffer(data[off : off + pb], dtype=np.uint8).copy()
off += pb
textures.append((tex_id, tw, th, pixels))
mb = mat_count * MATERIAL_DTYPE.itemsize
materials = np.frombuffer(data[off : off + mb], dtype=MATERIAL_DTYPE).copy()
off += mb
resources = {"meshes": meshes, "textures": textures, "materials": materials}
# Lights
lb = light_count * LIGHT_DTYPE.itemsize
lights = np.frombuffer(data[off : off + lb], dtype=LIGHT_DTYPE).copy() if light_count > 0 else np.empty(0, dtype=LIGHT_DTYPE)
off += lb
# Draw groups
draw_groups = []
for _ in range(group_count):
mesh_id, idx_count, inst_count, pass_type = _GROUP_HEADER.unpack_from(data, off)
off += _GROUP_HEADER.size
tb = inst_count * _MAT4_BYTES
transforms = np.frombuffer(data[off : off + tb], dtype=np.float32).reshape(inst_count, 4, 4).copy()
off += tb
mid_bytes = inst_count * 4
mat_ids = np.frombuffer(data[off : off + mid_bytes], dtype=np.uint32).copy()
off += mid_bytes
draw_groups.append({"mesh_id": mesh_id, "index_count": idx_count, "transforms": transforms, "material_ids": mat_ids, "pass_type": pass_type})
# Post-process (if flag set)
post_process = None
if flags & FLAG_HAS_POST_PROCESS:
bloom_en, bloom_thresh, bloom_int, bloom_knee = _POST_PROCESS.unpack_from(data, off)
off += _POST_PROCESS.size
post_process = {
"bloom_enabled": bool(bloom_en),
"bloom_threshold": bloom_thresh,
"bloom_intensity": bloom_int,
"bloom_soft_knee": bloom_knee,
}
result: dict[str, Any] = {"frame_id": frame_id, "flags": flags, "viewports": viewports, "lights": lights, "draw_groups": draw_groups}
if resources is not None:
result["resources"] = resources
if post_process is not None:
result["post_process"] = post_process
return result