Source code for simvx.graphics.streaming.server
"""WebSocket JPEG streaming server — captures frames from the Vulkan renderer and streams to browsers."""
from __future__ import annotations
import asyncio
import io
import json
import logging
import queue
import threading
from pathlib import Path
import numpy as np
from aiohttp import web
from PIL import Image
log = logging.getLogger(__name__)
__all__ = ["StreamingServer"]
_CLIENT_DIR = Path(__file__).parent / "client"
[docs]
class StreamingServer:
"""WebSocket JPEG streaming server for SimVX.
Serves a browser client and streams rendered frames as JPEG over WebSocket.
Input events from the browser are forwarded to the engine's Input singleton.
Usage::
server = StreamingServer(host="0.0.0.0", port=8080)
app = App(width=1280, height=720, visible=False)
app.run_streaming(MyGameScene(), server)
"""
def __init__(self, host: str = "0.0.0.0", port: int = 8080, quality: int = 85, fps: int = 30) -> None:
self.host = host
self.port = port
self.quality = quality
self.fps = fps
self._frame_queue: queue.Queue[np.ndarray] = queue.Queue(maxsize=4)
self._input_queue: queue.Queue[dict] = queue.Queue(maxsize=256)
self._ws_clients: list[web.WebSocketResponse] = []
self._loop: asyncio.AbstractEventLoop | None = None
self._thread: threading.Thread | None = None
self._running = False
self._width = 0
self._height = 0
[docs]
def start(self, width: int, height: int) -> None:
"""Start the streaming server in a background thread."""
self._width = width
self._height = height
self._running = True
self._thread = threading.Thread(target=self._run_server, daemon=True, name="simvx-streaming")
self._thread.start()
log.info("Streaming server starting on http://%s:%d", self.host, self.port)
[docs]
def stop(self) -> None:
"""Stop the streaming server and close all WebSocket connections."""
self._running = False
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(self._shutdown(), self._loop)
if self._thread:
self._thread.join(timeout=5.0)
log.info("Streaming server stopped")
[docs]
def push_frame(self, pixels: np.ndarray) -> None:
"""Push a captured frame for streaming. Thread-safe, called from the engine's main loop.
Drops frame silently if the encoder is behind (non-blocking).
"""
try:
self._frame_queue.put_nowait(pixels)
except queue.Full:
pass # Drop frame — encoder is behind
[docs]
def has_clients(self) -> bool:
"""Whether any browser clients are connected."""
return len(self._ws_clients) > 0
# -- Internal --
def _encode_jpeg(self, pixels: np.ndarray) -> bytes:
"""Encode RGBA numpy array to JPEG bytes."""
rgb = pixels[:, :, :3] # Drop alpha
img = Image.fromarray(rgb)
buf = io.BytesIO()
img.save(buf, "JPEG", quality=self.quality)
return buf.getvalue()
def _run_server(self) -> None:
"""Background thread: run the aiohttp event loop."""
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._loop.run_until_complete(self._start_web())
except Exception:
log.exception("Streaming server error")
finally:
self._loop.close()
async def _start_web(self) -> None:
"""Set up aiohttp routes and run."""
app = web.Application()
app.router.add_get("/", self._handle_index)
app.router.add_get("/ws", self._handle_ws)
app.router.add_get("/api/config", self._handle_config)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
log.info("Streaming server ready at http://%s:%d", self.host, self.port)
# Broadcast loop — pace at target FPS
while self._running:
try:
pixels = self._frame_queue.get_nowait()
jpeg = self._encode_jpeg(pixels)
await self._broadcast(jpeg)
except queue.Empty:
pass
await asyncio.sleep(1.0 / self.fps)
await runner.cleanup()
async def _shutdown(self) -> None:
"""Close all WebSocket connections."""
for ws_resp in list(self._ws_clients):
await ws_resp.close()
self._ws_clients.clear()
self._running = False
async def _broadcast(self, data: bytes) -> None:
"""Send binary JPEG frame to all connected WebSocket clients."""
dead = []
for ws_resp in self._ws_clients:
try:
await ws_resp.send_bytes(data)
except (ConnectionResetError, asyncio.CancelledError):
dead.append(ws_resp)
for ws_resp in dead:
self._ws_clients.remove(ws_resp)
async def _handle_index(self, _request: web.Request) -> web.Response:
"""Serve the browser client."""
index_path = _CLIENT_DIR / "index.html"
if index_path.exists():
return web.FileResponse(index_path)
return web.Response(text="SimVX Streaming — client not found", status=404)
async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse:
"""WebSocket endpoint: binary JPEG frames out, JSON input events in."""
ws_resp = web.WebSocketResponse()
await ws_resp.prepare(request)
self._ws_clients.append(ws_resp)
log.info("Streaming client connected (%d total)", len(self._ws_clients))
try:
async for msg in ws_resp:
if msg.type == web.WSMsgType.TEXT:
self._handle_input(msg.data)
elif msg.type == web.WSMsgType.ERROR:
log.debug("WebSocket error: %s", ws_resp.exception())
finally:
if ws_resp in self._ws_clients:
self._ws_clients.remove(ws_resp)
log.info("Streaming client disconnected (%d remaining)", len(self._ws_clients))
return ws_resp
async def _handle_config(self, _request: web.Request) -> web.Response:
"""Return engine viewport configuration."""
return web.json_response({"width": self._width, "height": self._height})
def _handle_input(self, message: str) -> None:
"""Queue input events from browser client for processing on the main thread."""
try:
data = json.loads(message)
except (json.JSONDecodeError, TypeError):
return
try:
self._input_queue.put_nowait(data)
except queue.Full:
pass