"""
Background resource loading -- threaded loading with progress tracking.
Provides a Godot-style interface for background resource loading with status
polling, suitable for loading screens and smooth level transitions.
Public API:
from simvx.core.resource_loader import ResourceLoader
ResourceLoader.load_threaded_request("mesh://sphere")
while True:
status, progress = ResourceLoader.load_threaded_get_status()
if status == "loaded":
resource = ResourceLoader.load_threaded_get()
break
update_loading_bar(progress)
"""
from __future__ import annotations
import logging
import threading
import time
from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from enum import StrEnum
from typing import Any
log = logging.getLogger(__name__)
__all__ = ["ResourceLoader", "LoadStatus"]
[docs]
class LoadStatus(StrEnum):
"""Status of a background load request."""
IDLE = "idle"
LOADING = "loading"
LOADED = "loaded"
ERROR = "error"
CANCELLED = "cancelled"
class _LoadRequest:
"""Internal tracking for a single load request."""
__slots__ = (
"path",
"future",
"status",
"progress",
"result",
"error",
"loader",
"deadline",
"cancelled",
)
def __init__(self, path: str, loader: Callable[[str], Any]):
self.path = path
self.loader = loader
self.status = LoadStatus.LOADING
self.progress: float = 0.0
self.result: Any = None
self.error: Exception | None = None
self.future: Future | None = None
self.deadline: float | None = None
self.cancelled: bool = False
[docs]
class ResourceLoader:
"""Singleton threaded resource loader with progress tracking.
Supports loading any resource type via registered loaders. Built-in loaders
handle mesh:// and audio:// URIs via ResourceCache.
Pool size and a default per-load timeout can be configured via
:meth:`configure` before first use. Call :meth:`reset` first if the
singleton has already been created.
"""
_instance: ResourceLoader | None = None
_lock = threading.Lock()
_default_max_workers: int = 2
_default_timeout: float | None = None
def __init__(self):
self._pool = ThreadPoolExecutor(
max_workers=self._default_max_workers,
thread_name_prefix="resource_loader",
)
self._requests: dict[str, _LoadRequest] = {}
self._loaders: dict[str, Callable[[str], Any]] = {}
self._default_timeout_override: float | None = self._default_timeout
self._register_default_loaders()
def _register_default_loaders(self):
"""Register built-in resource loaders."""
self._loaders["mesh"] = self._load_mesh
self._loaders["audio"] = self._load_audio
self._loaders["scene"] = self._load_scene
[docs]
@classmethod
def get(cls) -> ResourceLoader:
"""Return the singleton instance."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
[docs]
@classmethod
def reset(cls):
"""Reset the singleton (for tests). Shuts down the thread pool."""
if cls._instance is not None:
cls._instance._pool.shutdown(wait=False)
cls._instance = None
[docs]
def register_loader(self, scheme: str, loader: Callable[[str], Any]) -> None:
"""Register a custom loader for a URI scheme.
Args:
scheme: URI scheme (e.g., "texture", "model").
loader: Callable that takes a URI string and returns the loaded resource.
"""
self._loaders[scheme] = loader
[docs]
@classmethod
def load_threaded_request(
cls,
path: str,
loader: Callable[[str], Any] | None = None,
*,
timeout: float | None = None,
) -> None:
"""Start loading a resource in the background.
Args:
path: Resource URI (e.g., "mesh://sphere", "audio://sfx/boom.wav").
loader: Optional custom loader function. If None, uses registered loader.
timeout: Optional per-request timeout in seconds. Overrides the
configured default. A waiting ``load_threaded_get()`` call
will raise ``TimeoutError`` once this deadline is exceeded;
the worker thread itself is not hard-cancelled.
"""
inst = cls.get()
if path in inst._requests:
req = inst._requests[path]
if req.status == LoadStatus.LOADING:
return # already loading
if loader is None:
scheme = path.split("://")[0] if "://" in path else "file"
loader = inst._loaders.get(scheme, inst._load_file)
req = _LoadRequest(path, loader)
effective_timeout = timeout if timeout is not None else inst._default_timeout_override
if effective_timeout is not None:
req.deadline = time.monotonic() + effective_timeout
inst._requests[path] = req
def _do_load():
try:
result = req.loader(path)
if req.cancelled:
return # discard result, status handled by cancel()
req.result = result
req.progress = 1.0
req.status = LoadStatus.LOADED
except Exception as e:
if req.cancelled:
return
req.error = e
req.status = LoadStatus.ERROR
log.error("resource_loader: failed to load %s: %s", path, e)
req.future = inst._pool.submit(_do_load)
[docs]
@classmethod
def cancel(cls, path: str) -> bool:
"""Cancel a pending load request.
If the worker has not yet started, the future is cancelled and the
request removed. Otherwise the cancellation is best-effort: the
loader continues to completion but its result is discarded.
Returns:
True if the request was cancelled before it started, False if it
was already running or did not exist.
"""
inst = cls.get()
req = inst._requests.get(path)
if req is None:
return False
req.cancelled = True
cancelled = False
if req.future is not None:
cancelled = req.future.cancel()
req.status = LoadStatus.CANCELLED
if cancelled:
inst._requests.pop(path, None)
return cancelled
[docs]
@classmethod
def load_threaded_get_status(cls, path: str = "") -> tuple[str, float]:
"""Check the status of a background load.
Args:
path: Resource URI. If empty, returns status of the most recent request.
Returns:
Tuple of (status_string, progress_float). Status is one of:
"idle", "loading", "loaded", "error", "cancelled".
"""
inst = cls.get()
if not path:
if not inst._requests:
return ("idle", 0.0)
req = next(reversed(inst._requests.values()))
else:
if (req := inst._requests.get(path)) is None:
return ("idle", 0.0)
# Enforce deadline lazily.
if (
req.status == LoadStatus.LOADING
and req.deadline is not None
and time.monotonic() >= req.deadline
):
req.error = TimeoutError(f"Load of {req.path} exceeded deadline")
req.status = LoadStatus.ERROR
if req.future is not None:
req.future.cancel()
return (req.status.value, req.progress)
[docs]
@classmethod
def load_threaded_get(cls, path: str = "", *, timeout: float | None = None) -> Any:
"""Get the loaded resource (blocks if still loading).
Args:
path: Resource URI. If empty, gets the most recent request.
timeout: Optional blocking timeout in seconds. Overrides any
deadline set on the request at submission time.
Returns:
The loaded resource object.
Raises:
TimeoutError: If the wait exceeds *timeout* or the request's
submission-time deadline.
RuntimeError: If loading failed or no request exists.
"""
inst = cls.get()
if not path:
if not inst._requests:
raise RuntimeError("No pending load requests")
path = next(reversed(inst._requests))
if (req := inst._requests.get(path)) is None:
raise RuntimeError(f"No load request for: {path}")
# Work out how long to wait: explicit timeout > submission deadline > None.
wait: float | None = timeout
if wait is None and req.deadline is not None:
wait = max(0.0, req.deadline - time.monotonic())
if req.future is not None:
try:
req.future.result(timeout=wait)
except FutureTimeoutError as exc:
req.error = TimeoutError(f"Load of {path} timed out after {wait}s")
req.status = LoadStatus.ERROR
raise req.error from exc
if req.status == LoadStatus.ERROR:
raise RuntimeError(f"Failed to load {path}: {req.error}")
result = req.result
# Clean up completed request
del inst._requests[path]
return result
[docs]
@classmethod
def is_loading(cls) -> bool:
"""Check if any resources are currently loading."""
inst = cls.get()
return any(r.status == LoadStatus.LOADING for r in inst._requests.values())
[docs]
@classmethod
def get_progress(cls) -> float:
"""Get overall loading progress across all pending requests (0.0 to 1.0)."""
inst = cls.get()
if not inst._requests:
return 1.0
total = sum(r.progress for r in inst._requests.values())
return total / len(inst._requests)
# --- Built-in loaders ---
@staticmethod
def _load_mesh(uri: str) -> Any:
from .resource import ResourceCache
return ResourceCache.get().resolve_mesh(uri)
@staticmethod
def _load_audio(uri: str) -> Any:
from .resource import ResourceCache
return ResourceCache.get().resolve_audio(uri)
@staticmethod
def _load_scene(uri: str) -> Any:
from .scene import load_scene
path = uri.replace("scene://", "") if uri.startswith("scene://") else uri
return load_scene(path)
@staticmethod
def _load_file(path: str) -> bytes:
"""Fallback loader: read raw file bytes."""
with open(path, "rb") as f:
return f.read()