Source code for simvx.core.assets.handle

"""Load handles: observable progress + completion for one or many loads.

A :class:`Handle` represents a single in-flight load. Consumers can:

* poll ``handle.state`` / ``handle.progress`` / ``handle.is_done``
* read ``handle.error`` (without raising) if a failure has occurred
* call ``handle.result(timeout=...)`` to block until done (raises on failure)
* connect to ``handle.completed`` (a Signal) to observe finish on the
  main thread, ≤1 frame after the worker actually finished

A :class:`BatchHandle` aggregates many handles, exposing combined
progress, an ``item_completed`` signal per child, and a single
``completed`` signal when all children are done.
"""

from __future__ import annotations

import threading
from typing import Any, Final

from ..signals import Signal

__all__ = ["Handle", "BatchHandle", "PENDING", "LOADING", "LOADED", "FAILED", "CANCELLED"]


PENDING: Final = "pending"
LOADING: Final = "loading"
LOADED: Final = "loaded"
FAILED: Final = "failed"
CANCELLED: Final = "cancelled"


[docs] class Handle: """Observable handle for a single asset load. Created by :meth:`AssetServer.load`. Most fields are written by the worker thread and the server's main-thread flush; user code reads them via the documented properties. """ def __init__(self, uri: str) -> None: self.uri = uri self._state: str = PENDING self._progress: float = 0.0 self._result: Any = None self._error: BaseException | None = None self._cancel_requested: bool = False self._done = threading.Event() self.completed = Signal() # emits self once when state becomes a terminal state # ----- read-only views ----------------------------------------------------
[docs] @property def state(self) -> str: """One of ``"pending" | "loading" | "loaded" | "failed" | "cancelled"``.""" return self._state
[docs] @property def progress(self) -> float: """0.0–1.0. Real for HTTP loads; 0→1 in one tick for pkg/file/mem.""" return self._progress
[docs] @property def error(self) -> BaseException | None: """The exception that caused failure, if any. Never raises.""" return self._error
[docs] @property def is_done(self) -> bool: """True once the handle has reached a terminal state.""" return self._state in (LOADED, FAILED, CANCELLED)
# ----- consumer API -------------------------------------------------------
[docs] def cancel(self) -> None: """Request cancellation. The worker stops on its next progress check.""" self._cancel_requested = True
[docs] def result(self, timeout: float | None = None) -> Any: """Block until done; return the loaded value or re-raise on failure. Note: blocking on a load whose completion is dispatched via the SceneTree's per-frame flush requires the flush to happen on another thread. In a synchronous test, prefer waiting via the ``completed`` Signal or by running the server's flush manually. """ if not self._done.wait(timeout): raise TimeoutError(f"AssetServer load timed out: {self.uri!r}") if self._error is not None: raise self._error return self._result
# ----- internal state machine (called by AssetServer) --------------------- def _start(self) -> None: self._state = LOADING def _set_progress(self, fraction: float) -> None: self._progress = max(0.0, min(1.0, fraction)) def _finish_loaded(self, result: Any) -> None: self._result = result self._progress = 1.0 self._state = LOADED self._done.set() self.completed.emit(self) def _finish_failed(self, exc: BaseException) -> None: self._error = exc self._state = FAILED self._done.set() self.completed.emit(self) def _finish_cancelled(self) -> None: self._state = CANCELLED self._done.set() self.completed.emit(self)
[docs] def __repr__(self) -> str: return f"Handle({self.uri!r}, state={self._state}, progress={self._progress:.2f})"
[docs] class BatchHandle: """Aggregate of multiple :class:`Handle` objects. Emits :attr:`item_completed` once per child as it finishes (any terminal state) and :attr:`completed` once when all children are done. Drives a loading-screen progress bar via :attr:`progress`. """ def __init__(self, handles: list[Handle]) -> None: self.handles = list(handles) self.item_completed = Signal() # emits child Handle self.completed = Signal() # emits self self._fired_complete = False for h in self.handles: h.completed.connect(self._on_child_completed) if not self.handles: # An empty batch is trivially done. self._fired_complete = True self.completed.emit(self) # ----- aggregate views ----------------------------------------------------
[docs] @property def total(self) -> int: return len(self.handles)
[docs] @property def completed_count(self) -> int: return sum(1 for h in self.handles if h.state == LOADED)
[docs] @property def failed_count(self) -> int: return sum(1 for h in self.handles if h.state == FAILED)
[docs] @property def cancelled_count(self) -> int: return sum(1 for h in self.handles if h.state == CANCELLED)
[docs] @property def progress(self) -> float: """Sum of per-handle progress / total. Partial credit for in-flight HTTP.""" if not self.handles: return 1.0 return sum(h.progress for h in self.handles) / len(self.handles)
[docs] @property def is_done(self) -> bool: return all(h.is_done for h in self.handles)
# ----- consumer API -------------------------------------------------------
[docs] def cancel(self) -> None: """Cancel every child that has not yet finished.""" for h in self.handles: if not h.is_done: h.cancel()
[docs] def results(self, timeout: float | None = None) -> dict[str, Any]: """Block until all children done; return ``{uri: value}``. Raises the first child's error if any failed. """ for h in self.handles: h.result(timeout=timeout) return {h.uri: h._result for h in self.handles}
# ----- internal ----------------------------------------------------------- def _on_child_completed(self, child: Handle) -> None: self.item_completed.emit(child) if not self._fired_complete and self.is_done: self._fired_complete = True self.completed.emit(self)
[docs] def __repr__(self) -> str: return ( f"BatchHandle(total={self.total}, completed={self.completed_count}, " f"failed={self.failed_count}, progress={self.progress:.2f})" )