Source code for simvx.graphics.streaming.server

"""WebSocket JPEG streaming server — captures frames from the Vulkan renderer and streams to browsers."""


from __future__ import annotations

import asyncio
import io
import json
import logging
import queue
import threading
from pathlib import Path

import numpy as np
from aiohttp import web
from PIL import Image

log = logging.getLogger(__name__)

__all__ = ["StreamingServer"]

_CLIENT_DIR = Path(__file__).parent / "client"


[docs] class StreamingServer: """WebSocket JPEG streaming server for SimVX. Serves a browser client and streams rendered frames as JPEG over WebSocket. Input events from the browser are forwarded to the engine's Input singleton. Usage:: server = StreamingServer(host="0.0.0.0", port=8080) app = App(width=1280, height=720, visible=False) app.run_streaming(MyGameScene(), server) """ def __init__(self, host: str = "0.0.0.0", port: int = 8080, quality: int = 85, fps: int = 30) -> None: self.host = host self.port = port self.quality = quality self.fps = fps self._frame_queue: queue.Queue[np.ndarray] = queue.Queue(maxsize=4) self._input_queue: queue.Queue[dict] = queue.Queue(maxsize=256) self._ws_clients: list[web.WebSocketResponse] = [] self._loop: asyncio.AbstractEventLoop | None = None self._thread: threading.Thread | None = None self._running = False self._width = 0 self._height = 0
[docs] def start(self, width: int, height: int) -> None: """Start the streaming server in a background thread.""" self._width = width self._height = height self._running = True self._thread = threading.Thread(target=self._run_server, daemon=True, name="simvx-streaming") self._thread.start() log.info("Streaming server starting on http://%s:%d", self.host, self.port)
[docs] def stop(self) -> None: """Stop the streaming server and close all WebSocket connections.""" self._running = False if self._loop and self._loop.is_running(): asyncio.run_coroutine_threadsafe(self._shutdown(), self._loop) if self._thread: self._thread.join(timeout=5.0) log.info("Streaming server stopped")
[docs] def push_frame(self, pixels: np.ndarray) -> None: """Push a captured frame for streaming. Thread-safe, called from the engine's main loop. Drops frame silently if the encoder is behind (non-blocking). """ try: self._frame_queue.put_nowait(pixels) except queue.Full: pass # Drop frame — encoder is behind
[docs] def drain_input(self) -> list[dict]: """Drain all queued input events. Called from the main thread.""" events = [] while True: try: events.append(self._input_queue.get_nowait()) except queue.Empty: break return events
[docs] def has_clients(self) -> bool: """Whether any browser clients are connected.""" return len(self._ws_clients) > 0
# -- Internal -- def _encode_jpeg(self, pixels: np.ndarray) -> bytes: """Encode RGBA numpy array to JPEG bytes.""" rgb = pixels[:, :, :3] # Drop alpha img = Image.fromarray(rgb) buf = io.BytesIO() img.save(buf, "JPEG", quality=self.quality) return buf.getvalue() def _run_server(self) -> None: """Background thread: run the aiohttp event loop.""" self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) try: self._loop.run_until_complete(self._start_web()) except Exception: log.exception("Streaming server error") finally: self._loop.close() async def _start_web(self) -> None: """Set up aiohttp routes and run.""" app = web.Application() app.router.add_get("/", self._handle_index) app.router.add_get("/ws", self._handle_ws) app.router.add_get("/api/config", self._handle_config) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, self.host, self.port) await site.start() log.info("Streaming server ready at http://%s:%d", self.host, self.port) # Broadcast loop — pace at target FPS while self._running: try: pixels = self._frame_queue.get_nowait() jpeg = self._encode_jpeg(pixels) await self._broadcast(jpeg) except queue.Empty: pass await asyncio.sleep(1.0 / self.fps) await runner.cleanup() async def _shutdown(self) -> None: """Close all WebSocket connections.""" for ws_resp in list(self._ws_clients): await ws_resp.close() self._ws_clients.clear() self._running = False async def _broadcast(self, data: bytes) -> None: """Send binary JPEG frame to all connected WebSocket clients.""" dead = [] for ws_resp in self._ws_clients: try: await ws_resp.send_bytes(data) except (ConnectionResetError, asyncio.CancelledError): dead.append(ws_resp) for ws_resp in dead: self._ws_clients.remove(ws_resp) async def _handle_index(self, _request: web.Request) -> web.Response: """Serve the browser client.""" index_path = _CLIENT_DIR / "index.html" if index_path.exists(): return web.FileResponse(index_path) return web.Response(text="SimVX Streaming — client not found", status=404) async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse: """WebSocket endpoint: binary JPEG frames out, JSON input events in.""" ws_resp = web.WebSocketResponse() await ws_resp.prepare(request) self._ws_clients.append(ws_resp) log.info("Streaming client connected (%d total)", len(self._ws_clients)) try: async for msg in ws_resp: if msg.type == web.WSMsgType.TEXT: self._handle_input(msg.data) elif msg.type == web.WSMsgType.ERROR: log.debug("WebSocket error: %s", ws_resp.exception()) finally: if ws_resp in self._ws_clients: self._ws_clients.remove(ws_resp) log.info("Streaming client disconnected (%d remaining)", len(self._ws_clients)) return ws_resp async def _handle_config(self, _request: web.Request) -> web.Response: """Return engine viewport configuration.""" return web.json_response({"width": self._width, "height": self._height}) def _handle_input(self, message: str) -> None: """Queue input events from browser client for processing on the main thread.""" try: data = json.loads(message) except (json.JSONDecodeError, TypeError): return try: self._input_queue.put_nowait(data) except queue.Full: pass