Source code for simvx.core.ai.async_slot
"""AsyncSlot: a non-blocking bridge from the synchronous frame loop to one awaitable.
The engine frame loop is purely synchronous (``SceneTree.tick`` drives generator
coroutines with ``gen.send(dt)``); there is no asyncio event loop on the frame
thread, and a blocking ``await`` there would freeze the game. ``AsyncSlot`` owns
one daemon thread running one long-lived asyncio loop. The frame thread only ever
calls :meth:`submit` (returns immediately via ``run_coroutine_threadsafe``) and
:meth:`poll` (non-blocking; reads a finished result or ``None``).
This mirrors two precedents the engine already ships: ``AssetServer.flush``
(snapshot-under-lock then dispatch) and ``GitStatusProvider`` (daemon thread +
lock-guarded poll + self-disable-and-log on failure). The slot is LLM-agnostic:
it bridges *any* awaitable, so the hybrid commander and node-gen capstone reuse it
unchanged.
Policy:
- **Coalesce**: at most one in-flight future. :meth:`submit` is a no-op while a
call is pending (the dropped coroutine factory is never invoked, so there is
no "coroutine was never awaited" warning).
- **Graceful degrade**: :meth:`poll` logs the *specific* failure and returns
``None`` so the caller keeps its last good value, never a bare ``except`` that
hides bugs.
- **Teardown**: :meth:`close` stops the loop and joins the daemon thread; it is
idempotent. The thread is a daemon so a crashed game still exits.
"""
from __future__ import annotations
import asyncio
import logging
import threading
from collections.abc import Callable, Coroutine
from concurrent.futures import Future
from typing import Any
log = logging.getLogger("simvx.core.ai.async_slot")
#: A zero-arg factory that creates the coroutine. Indirection means the coroutine
#: is only created when actually submitted to the loop, so a coalesced (dropped)
#: submit never builds an un-awaited coroutine.
type CoroFactory[T] = Callable[[], Coroutine[Any, Any, T]]
[docs]
class AsyncSlot[T]:
"""One daemon asyncio loop bridging a single in-flight awaitable to the frame."""
def __init__(self, *, thread_name: str = "simvx-async-slot") -> None:
self._lock = threading.Lock()
self._inflight: Future[T] | None = None
self._closed = False
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._run_loop, name=thread_name, daemon=True)
self._ready = threading.Event()
self._thread.start()
self._ready.wait()
def _run_loop(self) -> None:
asyncio.set_event_loop(self._loop)
self._loop.call_soon(self._ready.set)
self._loop.run_forever()
# Drain and close once run_forever returns (loop.stop scheduled by close()).
self._loop.close()
[docs]
@property
def pending(self) -> bool:
"""True if a submitted call is still running (used by callers to gate cadence)."""
with self._lock:
return self._inflight is not None and not self._inflight.done()
[docs]
def submit(self, factory: CoroFactory[T]) -> bool:
"""Submit a coroutine factory to the loop, coalescing to one in-flight call.
Returns ``True`` if accepted (no call was pending), ``False`` if dropped
because a call is already in flight or the slot is closed. The dropped
factory is never invoked, so no coroutine is created and abandoned.
"""
with self._lock:
if self._closed:
return False
if self._inflight is not None and not self._inflight.done():
return False
# The coroutine is created ON the loop thread (via _start), so nothing
# is ever created-but-not-awaited if we tear down before it runs, and
# httpx.AsyncClient is created/awaited on the loop it is bound to.
fut: Future[T] = Future()
self._inflight = fut
self._loop.call_soon_threadsafe(self._start, factory, fut)
return True
def _start(self, factory: CoroFactory[T], fut: Future[T]) -> None:
"""Create and drive the coroutine on the loop thread, resolving ``fut``."""
if fut.cancelled():
return
task = self._loop.create_task(factory())
def _done(t: asyncio.Task[T]) -> None:
if fut.cancelled():
return
exc = t.exception()
if exc is not None:
fut.set_exception(exc)
else:
fut.set_result(t.result())
task.add_done_callback(_done)
[docs]
def poll(self) -> T | None:
"""Return a freshly completed result, or ``None`` if nothing new / it failed.
Non-blocking: never awaits. On success the slot is cleared and the value
returned. On exception the failure is logged and ``None`` is returned so
the caller retains its last good value. While a call is still pending, or
nothing has been submitted, returns ``None``.
"""
with self._lock:
fut = self._inflight
if fut is None or not fut.done():
return None
self._inflight = None
try:
return fut.result()
except Exception: # noqa: BLE001 - degrade gracefully, but log the specific cause
log.warning("AsyncSlot call failed; keeping last good value", exc_info=True)
return None
[docs]
def cancel(self) -> None:
"""Cancel any in-flight call without tearing down the loop."""
with self._lock:
fut = self._inflight
self._inflight = None
if fut is not None:
fut.cancel()
[docs]
def close(self) -> None:
"""Cancel in-flight work, stop the loop, and join the daemon thread (idempotent)."""
with self._lock:
if self._closed:
return
self._closed = True
fut = self._inflight
self._inflight = None
if fut is not None:
fut.cancel()
# Cancel and await any pending tasks ON the loop thread so no coroutine is
# left un-awaited, then stop the loop.
self._loop.call_soon_threadsafe(self._shutdown)
self._thread.join(timeout=5.0)
def _shutdown(self) -> None:
tasks = asyncio.all_tasks(self._loop)
for task in tasks:
task.cancel()
self._loop.stop()