Source code for simvx.core.ai.async_slot

"""AsyncSlot: a non-blocking bridge from the synchronous frame loop to one awaitable.

The engine frame loop is purely synchronous (``SceneTree.tick`` drives generator
coroutines with ``gen.send(dt)``); there is no asyncio event loop on the frame
thread, and a blocking ``await`` there would freeze the game. ``AsyncSlot`` owns
one daemon thread running one long-lived asyncio loop. The frame thread only ever
calls :meth:`submit` (returns immediately via ``run_coroutine_threadsafe``) and
:meth:`poll` (non-blocking; reads a finished result or ``None``).

This mirrors two precedents the engine already ships: ``AssetServer.flush``
(snapshot-under-lock then dispatch) and ``GitStatusProvider`` (daemon thread +
lock-guarded poll + self-disable-and-log on failure). The slot is LLM-agnostic:
it bridges *any* awaitable, so the hybrid commander and node-gen capstone reuse it
unchanged.

Policy:
  - **Coalesce**: at most one in-flight future. :meth:`submit` is a no-op while a
    call is pending (the dropped coroutine factory is never invoked, so there is
    no "coroutine was never awaited" warning).
  - **Graceful degrade**: :meth:`poll` logs the *specific* failure and returns
    ``None`` so the caller keeps its last good value, never a bare ``except`` that
    hides bugs.
  - **Teardown**: :meth:`close` stops the loop and joins the daemon thread; it is
    idempotent. The thread is a daemon so a crashed game still exits.
"""

from __future__ import annotations

import asyncio
import logging
import threading
from collections.abc import Callable, Coroutine
from concurrent.futures import Future
from typing import Any

log = logging.getLogger("simvx.core.ai.async_slot")

#: A zero-arg factory that creates the coroutine. Indirection means the coroutine
#: is only created when actually submitted to the loop, so a coalesced (dropped)
#: submit never builds an un-awaited coroutine.
type CoroFactory[T] = Callable[[], Coroutine[Any, Any, T]]


[docs] class AsyncSlot[T]: """One daemon asyncio loop bridging a single in-flight awaitable to the frame.""" def __init__(self, *, thread_name: str = "simvx-async-slot") -> None: self._lock = threading.Lock() self._inflight: Future[T] | None = None self._closed = False self._loop = asyncio.new_event_loop() self._thread = threading.Thread(target=self._run_loop, name=thread_name, daemon=True) self._ready = threading.Event() self._thread.start() self._ready.wait() def _run_loop(self) -> None: asyncio.set_event_loop(self._loop) self._loop.call_soon(self._ready.set) self._loop.run_forever() # Drain and close once run_forever returns (loop.stop scheduled by close()). self._loop.close()
[docs] @property def pending(self) -> bool: """True if a submitted call is still running (used by callers to gate cadence).""" with self._lock: return self._inflight is not None and not self._inflight.done()
[docs] def submit(self, factory: CoroFactory[T]) -> bool: """Submit a coroutine factory to the loop, coalescing to one in-flight call. Returns ``True`` if accepted (no call was pending), ``False`` if dropped because a call is already in flight or the slot is closed. The dropped factory is never invoked, so no coroutine is created and abandoned. """ with self._lock: if self._closed: return False if self._inflight is not None and not self._inflight.done(): return False # The coroutine is created ON the loop thread (via _start), so nothing # is ever created-but-not-awaited if we tear down before it runs, and # httpx.AsyncClient is created/awaited on the loop it is bound to. fut: Future[T] = Future() self._inflight = fut self._loop.call_soon_threadsafe(self._start, factory, fut) return True
def _start(self, factory: CoroFactory[T], fut: Future[T]) -> None: """Create and drive the coroutine on the loop thread, resolving ``fut``.""" if fut.cancelled(): return task = self._loop.create_task(factory()) def _done(t: asyncio.Task[T]) -> None: if fut.cancelled(): return exc = t.exception() if exc is not None: fut.set_exception(exc) else: fut.set_result(t.result()) task.add_done_callback(_done)
[docs] def poll(self) -> T | None: """Return a freshly completed result, or ``None`` if nothing new / it failed. Non-blocking: never awaits. On success the slot is cleared and the value returned. On exception the failure is logged and ``None`` is returned so the caller retains its last good value. While a call is still pending, or nothing has been submitted, returns ``None``. """ with self._lock: fut = self._inflight if fut is None or not fut.done(): return None self._inflight = None try: return fut.result() except Exception: # noqa: BLE001 - degrade gracefully, but log the specific cause log.warning("AsyncSlot call failed; keeping last good value", exc_info=True) return None
[docs] def cancel(self) -> None: """Cancel any in-flight call without tearing down the loop.""" with self._lock: fut = self._inflight self._inflight = None if fut is not None: fut.cancel()
[docs] def close(self) -> None: """Cancel in-flight work, stop the loop, and join the daemon thread (idempotent).""" with self._lock: if self._closed: return self._closed = True fut = self._inflight self._inflight = None if fut is not None: fut.cancel() # Cancel and await any pending tasks ON the loop thread so no coroutine is # left un-awaited, then stop the loop. self._loop.call_soon_threadsafe(self._shutdown) self._thread.join(timeout=5.0)
def _shutdown(self) -> None: tasks = asyncio.all_tasks(self._loop) for task in tasks: task.cancel() self._loop.stop()