Source code for simvx.core.assets.server

"""AssetServer: orchestrates Sources, Loaders, and a worker pool.

Public entry point for asset loading. Routes a URI through the right
:class:`~simvx.core.assets.source.Source` and :class:`~simvx.core.assets.loaders.base.Loader`,
runs the I/O on a thread pool, and marshals completion notifications
back onto the main thread via :meth:`flush` (called once per
``SceneTree.process(dt)`` tick).

Usage::

    server = AssetServer.instance()
    handle = server.load("pkg://game/textures/player.png")
    handle.completed.connect(lambda h: print("loaded:", h.state))

    batch = server.load_folder("pkg://game/textures/")
    batch.item_completed.connect(lambda h: print("loaded one:", h.uri))
    batch.completed.connect(lambda b: print("all done"))

The constructor is private-by-convention; use :meth:`AssetServer.instance`
to obtain the engine-wide singleton, or instantiate directly for
isolated tests.
"""

from __future__ import annotations

import json
import threading
from collections import deque
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor
from typing import Any

from .handle import BatchHandle, Handle
from .loaders import BytesLoader, JsonLoader
from .loaders.base import Loader
from .source import Source
from .sources import FileSource, HttpSource, MemSource, PkgSource

__all__ = ["AssetServer"]


def _split_scheme(uri: str) -> str:
    """Return the URL scheme of *uri*, or ``"file"`` for bare paths."""
    if "://" in uri:
        return uri.split("://", 1)[0]
    return "file"


[docs] class AssetServer: """Async typed-asset orchestrator with byte-budget caches per loader.""" _instance: AssetServer | None = None _instance_lock = threading.Lock() def __init__(self, *, max_workers: int = 2) -> None: self._sources: dict[str, Source] = {} self._loaders: list[Loader] = [] self._fallback_loader: Loader = BytesLoader() self._pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="asset_server") self._completions: deque[tuple[Handle, str, Any]] = deque() self._completions_lock = threading.Lock() self._install_defaults() # ------------------------------------------------------------------ singleton
[docs] @classmethod def instance(cls) -> AssetServer: """Return the engine-wide singleton, creating it on first access.""" if cls._instance is None: with cls._instance_lock: if cls._instance is None: cls._instance = cls() return cls._instance
[docs] @classmethod def reset_instance(cls) -> None: """Tear down and clear the singleton (for tests).""" if cls._instance is not None: cls._instance.shutdown() cls._instance = None
# ------------------------------------------------------------------ lifecycle
[docs] def shutdown(self) -> None: self._pool.shutdown(wait=False, cancel_futures=True)
def _install_defaults(self) -> None: self.register_source(PkgSource()) self.register_source(FileSource()) http = HttpSource() # One impl serves both schemes. self._sources["http"] = http self._sources["https"] = http self.register_source(MemSource()) self.register_loader(JsonLoader()) # ------------------------------------------------------------------ extension
[docs] def register_source(self, source: Source) -> None: """Install ``source`` for its declared ``scheme``.""" self._sources[source.scheme] = source
[docs] def register_loader(self, loader: Loader) -> None: """Install a typed loader. Order of registration is the lookup order.""" self._loaders.append(loader)
[docs] def get_source(self, scheme: str) -> Source: if scheme not in self._sources: raise KeyError(f"AssetServer: no Source registered for scheme {scheme!r}") return self._sources[scheme]
def _select_loader(self, uri: str, override: Loader | None) -> Loader: if override is not None: return override for loader in self._loaders: if loader.claims(uri): return loader return self._fallback_loader # ------------------------------------------------------------------ loading
[docs] def load( self, uri: str, *, cache: bool = True, loader: Loader | None = None, ) -> Handle: """Submit ``uri`` for asynchronous loading. Returns a :class:`Handle` that exposes progress, state, errors, and a ``completed`` Signal. Uses the registered Source for the URI's scheme and the first Loader whose ``claims(uri)`` returns True (or the explicit ``loader=`` override). When ``cache`` is True, both lookup and store go through the loader's LRU. """ chosen_loader = self._select_loader(uri, loader) scheme = _split_scheme(uri) source = self.get_source(scheme) handle = Handle(uri) # Cache hit: synthesize an immediate completion that flush() will dispatch. if cache: cached_version = source.version(uri) cached = chosen_loader.get_cached(uri, cached_version) if cached is not None: self._enqueue_completion(handle, "loaded", cached) return handle handle._start() self._pool.submit(self._do_load, handle, source, chosen_loader, cache) return handle
[docs] def load_group( self, uris: Iterable[str], *, cache: bool = True, loader: Loader | None = None, ) -> BatchHandle: """Load every URI in ``uris`` concurrently. Returns a :class:`BatchHandle`.""" handles = [self.load(u, cache=cache, loader=loader) for u in uris] return BatchHandle(handles)
[docs] def load_folder( self, uri: str, *, cache: bool = True, loader: Loader | None = None, ) -> BatchHandle: """Enumerate ``uri`` via the Source's ``list()`` and load each child. Sources that cannot list (e.g. HTTP) raise :class:`NotImplementedError`; use :meth:`load_manifest` for those. """ scheme = _split_scheme(uri) source = self.get_source(scheme) children = list(source.list(uri)) return self.load_group(children, cache=cache, loader=loader)
[docs] def load_manifest( self, manifest_uri: str, *, cache: bool = True, loader: Loader | None = None, ) -> BatchHandle: """Load the URI list from a JSON manifest, then load each entry. The manifest must be a JSON array of URI strings, e.g.:: ["pkg://game/level1/floor.png", "https://cdn.example.com/level1.ogg"] This call blocks briefly to fetch + parse the manifest, then returns a BatchHandle for the listed assets. """ scheme = _split_scheme(manifest_uri) source = self.get_source(scheme) raw = source.read_bytes(manifest_uri) entries = json.loads(raw.decode("utf-8")) if not isinstance(entries, list) or not all(isinstance(e, str) for e in entries): raise ValueError( f"Manifest at {manifest_uri!r} must be a JSON array of URI strings" ) return self.load_group(entries, cache=cache, loader=loader)
# ------------------------------------------------------------------ worker def _do_load(self, handle: Handle, source: Source, loader: Loader, cache: bool) -> None: if handle._cancel_requested: self._enqueue_completion(handle, "cancelled", None) return try: raw = source.read_bytes(handle.uri) if handle._cancel_requested: self._enqueue_completion(handle, "cancelled", None) return value = loader.parse(raw, handle.uri) if cache: version = source.version(handle.uri) loader.store(handle.uri, version, value) self._enqueue_completion(handle, "loaded", value) except BaseException as exc: # noqa: BLE001: re-raised on main thread self._enqueue_completion(handle, "failed", exc) def _enqueue_completion(self, handle: Handle, kind: str, payload: Any) -> None: with self._completions_lock: self._completions.append((handle, kind, payload)) # ------------------------------------------------------------------ flush
[docs] def flush(self) -> None: """Drain the completion queue, dispatching each on the calling thread. ``SceneTree.process(dt)`` calls this once per frame so the ``completed`` Signal on each Handle / BatchHandle fires on the main thread, ≤1 frame after the worker actually finished. """ # Snapshot under lock; dispatch outside so handler code can touch # the bus without re-entering the lock. with self._completions_lock: if not self._completions: return pending = list(self._completions) self._completions.clear() for handle, kind, payload in pending: if kind == "loaded": handle._finish_loaded(payload) elif kind == "failed": handle._finish_failed(payload) elif kind == "cancelled": handle._finish_cancelled()
# ------------------------------------------------------------------ cache mgmt
[docs] def invalidate(self, uri: str) -> None: """Drop ``uri`` from every loader's cache.""" for loader in (*self._loaders, self._fallback_loader): loader.cache.invalidate(uri)
[docs] def clear_caches(self) -> None: """Drop every cached asset across every loader.""" for loader in (*self._loaders, self._fallback_loader): loader.cache.clear()