"""Frame-loop driver that ticks any :class:`Brain` from a node's ``on_update``.
This is the dependency-free half of the AI driving layer: it ticks *any* core
``Brain`` (a hand-rolled behaviour tree, state machine, utility scorer, or the
LLM brain in ``simvx.ai``) and never imports ``simvx.ai`` or ``httpx``. Classical
brains are first-class here with zero LLM dependency.
Three primitives, smallest to largest:
- :func:`build_ai_context` - the free helper that snapshots a node + tree into
an :class:`AIContext` for one tick.
- :class:`BrainRunner` - owns ``brain + blackboard + tick_interval``; its
:meth:`BrainRunner.step` ticks at a cadence, passing the *elapsed since the
last tick* (an accumulator), not the raw frame ``dt``, so brains see correct
deltas regardless of tick rate. Usable from any existing ``Node`` subclass.
- :class:`AgentNode` - the canonical batteries-included node that builds a
Blackboard in ``on_ready``, calls ``runner.step`` in ``on_update``, and tears
the brain down in ``on_exit_tree``.
The driver calls exactly one method per tick: ``brain.tick(ctx)``. It never calls
``perceive`` / ``decide`` / ``act`` directly.
"""
from __future__ import annotations
from typing import Any
from ..node import Node
from ..properties import Property
from .blackboard import Blackboard
from .brain import AIContext, Brain
[docs]
def build_ai_context(node: Node, dt: float, blackboard: Blackboard) -> AIContext:
"""Snapshot a live in-tree node into an :class:`AIContext` for one tick.
``agent`` is the node, ``world`` is its :class:`SceneTree`, and ``now`` is the
tree's accumulated sim clock. Must be called while the node is in the tree
(``node.tree`` is ``None`` before ``on_enter_tree``); that is a programming
error, raised eagerly rather than silently feeding the brain ``now=0``.
"""
tree = node.tree
if tree is None:
raise RuntimeError("build_ai_context requires an in-tree node (call from on_ready/on_update, not __init__)")
return AIContext(agent=node, blackboard=blackboard, dt=dt, now=tree.now, world=tree)
[docs]
class BrainRunner:
"""Drive a :class:`Brain` at a fixed cadence from any node's per-frame update.
``tick_interval == 0`` ticks every frame (the classical authoritative cadence).
``tick_interval > 0`` ticks every ``tick_interval`` seconds, and the ``dt`` the
brain sees is the time elapsed since the previous tick (the accumulator), not
the raw frame ``dt``.
"""
def __init__(self, brain: Brain, blackboard: Blackboard, *, tick_interval: float = 0.0) -> None:
self.brain = brain
self.blackboard = blackboard
self.tick_interval = max(0.0, tick_interval)
self._accum = 0.0
[docs]
def step(self, node: Node, dt: float) -> None:
"""Advance the cadence by ``dt`` and tick the brain if it is due."""
if self.tick_interval <= 0.0:
self.brain.tick(build_ai_context(node, dt, self.blackboard))
return
self._accum += dt
if self._accum < self.tick_interval:
return
elapsed = self._accum
self._accum = 0.0
self.brain.tick(build_ai_context(node, elapsed, self.blackboard))
def _teardown_brain(brain: Brain | None) -> None:
"""Duck-typed teardown so core never imports the LLM brain.
A classical brain has no resources; an ``LLMBrain`` exposes ``close`` to shut
its :class:`AsyncSlot`. We call whichever it has, keeping core LLM-free.
"""
if brain is None:
return
close = getattr(brain, "close", None)
if callable(close):
close()
[docs]
class AgentNode(Node):
"""A node that drives one :class:`Brain` every frame (or at ``tick_interval``).
Set ``self.brain`` (in ``__init__`` or before the node enters the tree). The
node builds its :class:`Blackboard` in ``on_ready`` (optionally a ``child()`` of
a squad board), creates a :class:`BrainRunner`, ticks it in ``on_update``, and
tears the brain down in ``on_exit_tree``. With ``self.brain = None`` it is an
inert node, so subclasses can set the brain conditionally.
"""
tick_interval = Property(0.0, range=(0.0, 10.0))
def __init__(self, *, brain: Brain | None = None, squad: Blackboard | None = None, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.brain: Brain | None = brain
self.squad = squad
self.blackboard: Blackboard | None = None
self.runner: BrainRunner | None = None
[docs]
def on_ready(self) -> None:
if self.brain is None:
return
self.blackboard = self.squad.child() if self.squad is not None else Blackboard()
self.runner = BrainRunner(self.brain, self.blackboard, tick_interval=self.tick_interval)
[docs]
def on_update(self, dt: float) -> None:
if self.runner is None or self.tree is None:
return
self.runner.step(self, dt)
[docs]
def on_exit_tree(self) -> None:
_teardown_brain(self.brain)