Source code for simvx.core.assets.handle
"""Load handles: observable progress + completion for one or many loads.
A :class:`Handle` represents a single in-flight load. Consumers can:
* poll ``handle.state`` / ``handle.progress`` / ``handle.is_done``
* read ``handle.error`` (without raising) if a failure has occurred
* call ``handle.result(timeout=...)`` to block until done (raises on failure)
* connect to ``handle.completed`` (a Signal) to observe finish on the
main thread, ≤1 frame after the worker actually finished
A :class:`BatchHandle` aggregates many handles, exposing combined
progress, an ``item_completed`` signal per child, and a single
``completed`` signal when all children are done.
"""
from __future__ import annotations
import threading
from typing import Any, Final
from ..signals import Signal
__all__ = ["Handle", "BatchHandle", "PENDING", "LOADING", "LOADED", "FAILED", "CANCELLED"]
PENDING: Final = "pending"
LOADING: Final = "loading"
LOADED: Final = "loaded"
FAILED: Final = "failed"
CANCELLED: Final = "cancelled"
[docs]
class Handle:
"""Observable handle for a single asset load.
Created by :meth:`AssetServer.load`. Most fields are written by the
worker thread and the server's main-thread flush; user code reads
them via the documented properties.
"""
def __init__(self, uri: str) -> None:
self.uri = uri
self._state: str = PENDING
self._progress: float = 0.0
self._result: Any = None
self._error: BaseException | None = None
self._cancel_requested: bool = False
self._done = threading.Event()
self.completed = Signal() # emits self once when state becomes a terminal state
# ----- read-only views ----------------------------------------------------
[docs]
@property
def state(self) -> str:
"""One of ``"pending" | "loading" | "loaded" | "failed" | "cancelled"``."""
return self._state
[docs]
@property
def progress(self) -> float:
"""0.0–1.0. Real for HTTP loads; 0→1 in one tick for pkg/file/mem."""
return self._progress
[docs]
@property
def error(self) -> BaseException | None:
"""The exception that caused failure, if any. Never raises."""
return self._error
[docs]
@property
def is_done(self) -> bool:
"""True once the handle has reached a terminal state."""
return self._state in (LOADED, FAILED, CANCELLED)
# ----- consumer API -------------------------------------------------------
[docs]
def cancel(self) -> None:
"""Request cancellation. The worker stops on its next progress check."""
self._cancel_requested = True
[docs]
def result(self, timeout: float | None = None) -> Any:
"""Block until done; return the loaded value or re-raise on failure.
Note: blocking on a load whose completion is dispatched via the
SceneTree's per-frame flush requires the flush to happen on
another thread. In a synchronous test, prefer waiting via the
``completed`` Signal or by running the server's flush manually.
"""
if not self._done.wait(timeout):
raise TimeoutError(f"AssetServer load timed out: {self.uri!r}")
if self._error is not None:
raise self._error
return self._result
# ----- internal state machine (called by AssetServer) ---------------------
def _start(self) -> None:
self._state = LOADING
def _set_progress(self, fraction: float) -> None:
self._progress = max(0.0, min(1.0, fraction))
def _finish_loaded(self, result: Any) -> None:
self._result = result
self._progress = 1.0
self._state = LOADED
self._done.set()
self.completed.emit(self)
def _finish_failed(self, exc: BaseException) -> None:
self._error = exc
self._state = FAILED
self._done.set()
self.completed.emit(self)
def _finish_cancelled(self) -> None:
self._state = CANCELLED
self._done.set()
self.completed.emit(self)
[docs]
def __repr__(self) -> str:
return f"Handle({self.uri!r}, state={self._state}, progress={self._progress:.2f})"
[docs]
class BatchHandle:
"""Aggregate of multiple :class:`Handle` objects.
Emits :attr:`item_completed` once per child as it finishes (any
terminal state) and :attr:`completed` once when all children are
done. Drives a loading-screen progress bar via :attr:`progress`.
"""
def __init__(self, handles: list[Handle]) -> None:
self.handles = list(handles)
self.item_completed = Signal() # emits child Handle
self.completed = Signal() # emits self
self._fired_complete = False
for h in self.handles:
h.completed.connect(self._on_child_completed)
if not self.handles:
# An empty batch is trivially done.
self._fired_complete = True
self.completed.emit(self)
# ----- aggregate views ----------------------------------------------------
[docs]
@property
def total(self) -> int:
return len(self.handles)
[docs]
@property
def completed_count(self) -> int:
return sum(1 for h in self.handles if h.state == LOADED)
[docs]
@property
def failed_count(self) -> int:
return sum(1 for h in self.handles if h.state == FAILED)
[docs]
@property
def cancelled_count(self) -> int:
return sum(1 for h in self.handles if h.state == CANCELLED)
[docs]
@property
def progress(self) -> float:
"""Sum of per-handle progress / total. Partial credit for in-flight HTTP."""
if not self.handles:
return 1.0
return sum(h.progress for h in self.handles) / len(self.handles)
[docs]
@property
def is_done(self) -> bool:
return all(h.is_done for h in self.handles)
# ----- consumer API -------------------------------------------------------
[docs]
def cancel(self) -> None:
"""Cancel every child that has not yet finished."""
for h in self.handles:
if not h.is_done:
h.cancel()
[docs]
def results(self, timeout: float | None = None) -> dict[str, Any]:
"""Block until all children done; return ``{uri: value}``.
Raises the first child's error if any failed.
"""
for h in self.handles:
h.result(timeout=timeout)
return {h.uri: h._result for h in self.handles}
# ----- internal -----------------------------------------------------------
def _on_child_completed(self, child: Handle) -> None:
self.item_completed.emit(child)
if not self._fired_complete and self.is_done:
self._fired_complete = True
self.completed.emit(self)
[docs]
def __repr__(self) -> str:
return (
f"BatchHandle(total={self.total}, completed={self.completed_count}, "
f"failed={self.failed_count}, progress={self.progress:.2f})"
)