Source code for simvx.core.resource_loader

"""
Background resource loading -- threaded loading with progress tracking.

Provides a Godot-style interface for background resource loading with status
polling, suitable for loading screens and smooth level transitions.

Public API:
    from simvx.core.resource_loader import ResourceLoader

    ResourceLoader.load_threaded_request("mesh://sphere")
    while True:
        status, progress = ResourceLoader.load_threaded_get_status()
        if status == "loaded":
            resource = ResourceLoader.load_threaded_get()
            break
        update_loading_bar(progress)
"""


from __future__ import annotations

import logging
import threading
import time
from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from enum import StrEnum
from typing import Any

log = logging.getLogger(__name__)

__all__ = ["ResourceLoader", "LoadStatus"]


[docs] class LoadStatus(StrEnum): """Status of a background load request.""" IDLE = "idle" LOADING = "loading" LOADED = "loaded" ERROR = "error" CANCELLED = "cancelled"
class _LoadRequest: """Internal tracking for a single load request.""" __slots__ = ( "path", "future", "status", "progress", "result", "error", "loader", "deadline", "cancelled", ) def __init__(self, path: str, loader: Callable[[str], Any]): self.path = path self.loader = loader self.status = LoadStatus.LOADING self.progress: float = 0.0 self.result: Any = None self.error: Exception | None = None self.future: Future | None = None self.deadline: float | None = None self.cancelled: bool = False
[docs] class ResourceLoader: """Singleton threaded resource loader with progress tracking. Supports loading any resource type via registered loaders. Built-in loaders handle mesh:// and audio:// URIs via ResourceCache. Pool size and a default per-load timeout can be configured via :meth:`configure` before first use. Call :meth:`reset` first if the singleton has already been created. """ _instance: ResourceLoader | None = None _lock = threading.Lock() _default_max_workers: int = 2 _default_timeout: float | None = None def __init__(self): self._pool = ThreadPoolExecutor( max_workers=self._default_max_workers, thread_name_prefix="resource_loader", ) self._requests: dict[str, _LoadRequest] = {} self._loaders: dict[str, Callable[[str], Any]] = {} self._default_timeout_override: float | None = self._default_timeout self._register_default_loaders()
[docs] @classmethod def configure(cls, *, max_workers: int = 2, default_timeout: float | None = None) -> None: """Set pool size and default per-load timeout. Must be called before the singleton is first accessed. Call :meth:`reset` first to reconfigure an existing instance. Args: max_workers: Thread pool size (must be >= 1). default_timeout: Default timeout in seconds for all loads, or ``None`` for no timeout. """ if max_workers < 1: raise ValueError(f"max_workers must be >= 1, got {max_workers}") if default_timeout is not None and default_timeout <= 0: raise ValueError(f"default_timeout must be > 0, got {default_timeout}") with cls._lock: if cls._instance is not None: raise RuntimeError( "ResourceLoader.configure() must be called before first use. " "Call ResourceLoader.reset() first to reconfigure." ) cls._default_max_workers = max_workers cls._default_timeout = default_timeout
def _register_default_loaders(self): """Register built-in resource loaders.""" self._loaders["mesh"] = self._load_mesh self._loaders["audio"] = self._load_audio self._loaders["scene"] = self._load_scene
[docs] @classmethod def get(cls) -> ResourceLoader: """Return the singleton instance.""" if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = cls() return cls._instance
[docs] @classmethod def reset(cls): """Reset the singleton (for tests). Shuts down the thread pool.""" if cls._instance is not None: cls._instance._pool.shutdown(wait=False) cls._instance = None
[docs] def register_loader(self, scheme: str, loader: Callable[[str], Any]) -> None: """Register a custom loader for a URI scheme. Args: scheme: URI scheme (e.g., "texture", "model"). loader: Callable that takes a URI string and returns the loaded resource. """ self._loaders[scheme] = loader
[docs] @classmethod def load_threaded_request( cls, path: str, loader: Callable[[str], Any] | None = None, *, timeout: float | None = None, ) -> None: """Start loading a resource in the background. Args: path: Resource URI (e.g., "mesh://sphere", "audio://sfx/boom.wav"). loader: Optional custom loader function. If None, uses registered loader. timeout: Optional per-request timeout in seconds. Overrides the configured default. A waiting ``load_threaded_get()`` call will raise ``TimeoutError`` once this deadline is exceeded; the worker thread itself is not hard-cancelled. """ inst = cls.get() if path in inst._requests: req = inst._requests[path] if req.status == LoadStatus.LOADING: return # already loading if loader is None: scheme = path.split("://")[0] if "://" in path else "file" loader = inst._loaders.get(scheme, inst._load_file) req = _LoadRequest(path, loader) effective_timeout = timeout if timeout is not None else inst._default_timeout_override if effective_timeout is not None: req.deadline = time.monotonic() + effective_timeout inst._requests[path] = req def _do_load(): try: result = req.loader(path) if req.cancelled: return # discard result, status handled by cancel() req.result = result req.progress = 1.0 req.status = LoadStatus.LOADED except Exception as e: if req.cancelled: return req.error = e req.status = LoadStatus.ERROR log.error("resource_loader: failed to load %s: %s", path, e) req.future = inst._pool.submit(_do_load)
[docs] @classmethod def cancel(cls, path: str) -> bool: """Cancel a pending load request. If the worker has not yet started, the future is cancelled and the request removed. Otherwise the cancellation is best-effort: the loader continues to completion but its result is discarded. Returns: True if the request was cancelled before it started, False if it was already running or did not exist. """ inst = cls.get() req = inst._requests.get(path) if req is None: return False req.cancelled = True cancelled = False if req.future is not None: cancelled = req.future.cancel() req.status = LoadStatus.CANCELLED if cancelled: inst._requests.pop(path, None) return cancelled
[docs] @classmethod def load_threaded_get_status(cls, path: str = "") -> tuple[str, float]: """Check the status of a background load. Args: path: Resource URI. If empty, returns status of the most recent request. Returns: Tuple of (status_string, progress_float). Status is one of: "idle", "loading", "loaded", "error", "cancelled". """ inst = cls.get() if not path: if not inst._requests: return ("idle", 0.0) req = next(reversed(inst._requests.values())) else: if (req := inst._requests.get(path)) is None: return ("idle", 0.0) # Enforce deadline lazily. if ( req.status == LoadStatus.LOADING and req.deadline is not None and time.monotonic() >= req.deadline ): req.error = TimeoutError(f"Load of {req.path} exceeded deadline") req.status = LoadStatus.ERROR if req.future is not None: req.future.cancel() return (req.status.value, req.progress)
[docs] @classmethod def load_threaded_get(cls, path: str = "", *, timeout: float | None = None) -> Any: """Get the loaded resource (blocks if still loading). Args: path: Resource URI. If empty, gets the most recent request. timeout: Optional blocking timeout in seconds. Overrides any deadline set on the request at submission time. Returns: The loaded resource object. Raises: TimeoutError: If the wait exceeds *timeout* or the request's submission-time deadline. RuntimeError: If loading failed or no request exists. """ inst = cls.get() if not path: if not inst._requests: raise RuntimeError("No pending load requests") path = next(reversed(inst._requests)) if (req := inst._requests.get(path)) is None: raise RuntimeError(f"No load request for: {path}") # Work out how long to wait: explicit timeout > submission deadline > None. wait: float | None = timeout if wait is None and req.deadline is not None: wait = max(0.0, req.deadline - time.monotonic()) if req.future is not None: try: req.future.result(timeout=wait) except FutureTimeoutError as exc: req.error = TimeoutError(f"Load of {path} timed out after {wait}s") req.status = LoadStatus.ERROR raise req.error from exc if req.status == LoadStatus.ERROR: raise RuntimeError(f"Failed to load {path}: {req.error}") result = req.result # Clean up completed request del inst._requests[path] return result
[docs] @classmethod def is_loading(cls) -> bool: """Check if any resources are currently loading.""" inst = cls.get() return any(r.status == LoadStatus.LOADING for r in inst._requests.values())
[docs] @classmethod def get_progress(cls) -> float: """Get overall loading progress across all pending requests (0.0 to 1.0).""" inst = cls.get() if not inst._requests: return 1.0 total = sum(r.progress for r in inst._requests.values()) return total / len(inst._requests)
# --- Built-in loaders --- @staticmethod def _load_mesh(uri: str) -> Any: from .resource import ResourceCache return ResourceCache.get().resolve_mesh(uri) @staticmethod def _load_audio(uri: str) -> Any: from .resource import ResourceCache return ResourceCache.get().resolve_audio(uri) @staticmethod def _load_scene(uri: str) -> Any: from .scene import load_scene path = uri.replace("scene://", "") if uri.startswith("scene://") else uri return load_scene(path) @staticmethod def _load_file(path: str) -> bytes: """Fallback loader: read raw file bytes.""" with open(path, "rb") as f: return f.read()