Source code for simvx.graphics.streaming.scene3d_serializer

"""Binary serialization of 3D render state for WebSocket streaming.

Serializes a complete 3D scene frame into compact binary for transmission
to a browser-side WebGPU renderer.

Wire format per frame::

    HEADER (20 bytes):
        frame_id(u32) flags(u32) viewport_count(u32) light_count(u32) draw_group_count(u32)

    Per VIEWPORT (136 bytes):
        x(u32) y(u32) w(u32) h(u32) + view_mat(16×f32) + proj_mat(16×f32)

    RESOURCES (only if flags bit 0 set):
        mesh_count(u32) + texture_count(u32) + material_count(u32)
        Per mesh:   mesh_id(u32) vertex_count(u32) index_count(u32)
                    vertex_bytes(vertex_count × 32)
                    index_bytes(index_count × 4)
        Per texture: tex_id(u32) width(u32) height(u32) pixel_bytes(width × height × 4)
        material_bytes(material_count × MATERIAL_DTYPE.itemsize)

    LIGHTS (light_count × LIGHT_DTYPE.itemsize bytes):
        Raw LIGHT_DTYPE data

    DRAW GROUPS (draw_group_count entries):
        Per group: mesh_id(u32) index_count(u32) instance_count(u32) pass_type(u32)
        transform_bytes(instance_count × 64)  -- 4×4 model matrix per instance, f32
        material_ids(instance_count × u32)
        pass_type: 0=OPAQUE, 1=DOUBLE_SIDED, 2=TRANSPARENT

    POST-PROCESS (only if flags bit 1 set, 16 bytes):
        bloom_enabled(u32) bloom_threshold(f32) bloom_intensity(f32) bloom_soft_knee(f32)
"""


from __future__ import annotations

import struct
from typing import Any

import numpy as np

from .._types import LIGHT_DTYPE, MATERIAL_DTYPE, VERTEX_DTYPE

__all__ = ["Scene3DSerializer"]

FLAG_HAS_RESOURCES = 1 << 0
FLAG_HAS_POST_PROCESS = 1 << 1

# Header: frame_id(u32) + flags(u32) + viewport_count(u32) + light_count(u32) + draw_group_count(u32)
_HEADER = struct.Struct("<IIIII")
# Viewport rect: x(u32) + y(u32) + w(u32) + h(u32)
_VP_RECT = struct.Struct("<IIII")
# Resource counts: mesh_count(u32) + texture_count(u32) + material_count(u32)
_RES_COUNTS = struct.Struct("<III")
# Mesh header: mesh_id(u32) + vertex_count(u32) + index_count(u32)
_MESH_HEADER = struct.Struct("<III")
# Texture header: tex_id(u32) + width(u32) + height(u32)
_TEX_HEADER = struct.Struct("<III")
# Draw group header: mesh_id(u32) + index_count(u32) + instance_count(u32) + pass_type(u32)
_GROUP_HEADER = struct.Struct("<IIII")
# Post-process: bloom_enabled(u32) + bloom_threshold(f32) + bloom_intensity(f32) + bloom_soft_knee(f32)
_POST_PROCESS = struct.Struct("<Ifff")

_MAT4_BYTES = 64  # 16 × f32


[docs] class Scene3DSerializer: """Serializes 3D scene state into binary frames for WebSocket streaming."""
[docs] @staticmethod def serialize_frame( frame_id: int, viewports: list[dict[str, Any]], lights: np.ndarray, draw_groups: list[dict[str, Any]], resources: dict[str, Any] | None = None, post_process: dict[str, Any] | None = None, ) -> bytes: """Serialize a 3D scene frame into compact binary. Args: frame_id: Monotonic frame counter. viewports: List of dicts with keys ``x, y, width, height, view_matrix, proj_matrix`` (matrices are 4x4 float32 arrays). lights: Structured numpy array with dtype ``LIGHT_DTYPE``. draw_groups: List of dicts with keys ``mesh_id, index_count, transforms, material_ids`` where transforms is (N, 4, 4) float32 and material_ids is (N,) uint32. resources: Optional dict with keys ``meshes, textures, materials`` for initial upload. meshes: list of (mesh_id, vertices, indices). textures: list of (tex_id, width, height, pixels). materials: structured numpy array with dtype ``MATERIAL_DTYPE``. post_process: Optional dict with bloom settings: bloom_enabled (bool), bloom_threshold (float), bloom_intensity (float), bloom_soft_knee (float). Returns: Compact binary frame ready for WebSocket transmission. """ flags = FLAG_HAS_RESOURCES if resources is not None else 0 if post_process is not None: flags |= FLAG_HAS_POST_PROCESS light_count = len(lights) if lights is not None and len(lights) > 0 else 0 parts: list[bytes] = [_HEADER.pack(frame_id, flags, len(viewports), light_count, len(draw_groups))] # Viewports for vp in viewports: parts.append(_VP_RECT.pack(vp["x"], vp["y"], vp["width"], vp["height"])) parts.append(np.asarray(vp["view_matrix"], dtype=np.float32).tobytes()) parts.append(np.asarray(vp["proj_matrix"], dtype=np.float32).tobytes()) # Resources (optional) if resources is not None: meshes = resources.get("meshes", []) textures = resources.get("textures", []) materials = resources.get("materials", np.empty(0, dtype=MATERIAL_DTYPE)) parts.append(_RES_COUNTS.pack(len(meshes), len(textures), len(materials))) for mesh_id, vertices, indices in meshes: verts = np.asarray(vertices, dtype=VERTEX_DTYPE) idxs = np.asarray(indices, dtype=np.uint32) parts.append(_MESH_HEADER.pack(mesh_id, len(verts), len(idxs))) parts.append(verts.tobytes()) parts.append(idxs.tobytes()) for tex_id, width, height, pixels in textures: parts.append(_TEX_HEADER.pack(tex_id, width, height)) parts.append(np.asarray(pixels, dtype=np.uint8).tobytes()) parts.append(np.asarray(materials, dtype=MATERIAL_DTYPE).tobytes()) # Lights if light_count > 0: parts.append(np.asarray(lights, dtype=LIGHT_DTYPE).tobytes()) # Draw groups for group in draw_groups: transforms = np.asarray(group["transforms"], dtype=np.float32) mat_ids = np.asarray(group["material_ids"], dtype=np.uint32) instance_count = len(mat_ids) parts.append(_GROUP_HEADER.pack(group["mesh_id"], group["index_count"], instance_count, group.get("pass_type", 0))) parts.append(transforms.tobytes()) parts.append(mat_ids.tobytes()) # Post-process (16 bytes, after draw groups) if post_process is not None: parts.append(_POST_PROCESS.pack( 1 if post_process.get("bloom_enabled", False) else 0, float(post_process.get("bloom_threshold", 1.0)), float(post_process.get("bloom_intensity", 0.8)), float(post_process.get("bloom_soft_knee", 0.5)), )) return b"".join(parts)
[docs] @staticmethod def deserialize_frame(data: bytes) -> dict[str, Any]: """Deserialize a binary frame back into structured data (for testing). Returns: Dict with keys: frame_id, flags, viewports, lights, draw_groups, resources (optional). """ off = 0 frame_id, flags, vp_count, light_count, group_count = _HEADER.unpack_from(data, off) off += _HEADER.size # Viewports viewports = [] for _ in range(vp_count): x, y, w, h = _VP_RECT.unpack_from(data, off) off += _VP_RECT.size view_mat = np.frombuffer(data[off : off + _MAT4_BYTES], dtype=np.float32).reshape(4, 4).copy() off += _MAT4_BYTES proj_mat = np.frombuffer(data[off : off + _MAT4_BYTES], dtype=np.float32).reshape(4, 4).copy() off += _MAT4_BYTES viewports.append({"x": x, "y": y, "width": w, "height": h, "view_matrix": view_mat, "proj_matrix": proj_mat}) # Resources resources = None if flags & FLAG_HAS_RESOURCES: mesh_count, tex_count, mat_count = _RES_COUNTS.unpack_from(data, off) off += _RES_COUNTS.size meshes = [] for _ in range(mesh_count): mesh_id, vert_count, idx_count = _MESH_HEADER.unpack_from(data, off) off += _MESH_HEADER.size vb = vert_count * VERTEX_DTYPE.itemsize verts = np.frombuffer(data[off : off + vb], dtype=VERTEX_DTYPE).copy() off += vb ib = idx_count * 4 idxs = np.frombuffer(data[off : off + ib], dtype=np.uint32).copy() off += ib meshes.append((mesh_id, verts, idxs)) textures = [] for _ in range(tex_count): tex_id, tw, th = _TEX_HEADER.unpack_from(data, off) off += _TEX_HEADER.size pb = tw * th * 4 pixels = np.frombuffer(data[off : off + pb], dtype=np.uint8).copy() off += pb textures.append((tex_id, tw, th, pixels)) mb = mat_count * MATERIAL_DTYPE.itemsize materials = np.frombuffer(data[off : off + mb], dtype=MATERIAL_DTYPE).copy() off += mb resources = {"meshes": meshes, "textures": textures, "materials": materials} # Lights lb = light_count * LIGHT_DTYPE.itemsize lights = np.frombuffer(data[off : off + lb], dtype=LIGHT_DTYPE).copy() if light_count > 0 else np.empty(0, dtype=LIGHT_DTYPE) off += lb # Draw groups draw_groups = [] for _ in range(group_count): mesh_id, idx_count, inst_count, pass_type = _GROUP_HEADER.unpack_from(data, off) off += _GROUP_HEADER.size tb = inst_count * _MAT4_BYTES transforms = np.frombuffer(data[off : off + tb], dtype=np.float32).reshape(inst_count, 4, 4).copy() off += tb mid_bytes = inst_count * 4 mat_ids = np.frombuffer(data[off : off + mid_bytes], dtype=np.uint32).copy() off += mid_bytes draw_groups.append({"mesh_id": mesh_id, "index_count": idx_count, "transforms": transforms, "material_ids": mat_ids, "pass_type": pass_type}) # Post-process (if flag set) post_process = None if flags & FLAG_HAS_POST_PROCESS: bloom_en, bloom_thresh, bloom_int, bloom_knee = _POST_PROCESS.unpack_from(data, off) off += _POST_PROCESS.size post_process = { "bloom_enabled": bool(bloom_en), "bloom_threshold": bloom_thresh, "bloom_intensity": bloom_int, "bloom_soft_knee": bloom_knee, } result: dict[str, Any] = {"frame_id": frame_id, "flags": flags, "viewports": viewports, "lights": lights, "draw_groups": draw_groups} if resources is not None: result["resources"] = resources if post_process is not None: result["post_process"] = post_process return result