Source code for simvx.core.ai.agent_node

"""Frame-loop driver that ticks any :class:`Brain` from a node's ``on_update``.

This is the dependency-free half of the AI driving layer: it ticks *any* core
``Brain`` (a hand-rolled behaviour tree, state machine, utility scorer, or the
LLM brain in ``simvx.ai``) and never imports ``simvx.ai`` or ``httpx``. Classical
brains are first-class here with zero LLM dependency.

Three primitives, smallest to largest:

  - :func:`build_ai_context` - the free helper that snapshots a node + tree into
    an :class:`AIContext` for one tick.
  - :class:`BrainRunner` - owns ``brain + blackboard + tick_interval``; its
    :meth:`BrainRunner.step` ticks at a cadence, passing the *elapsed since the
    last tick* (an accumulator), not the raw frame ``dt``, so brains see correct
    deltas regardless of tick rate. Usable from any existing ``Node`` subclass.
  - :class:`AgentNode` - the canonical batteries-included node that builds a
    Blackboard in ``on_ready``, calls ``runner.step`` in ``on_update``, and tears
    the brain down in ``on_exit_tree``.

The driver calls exactly one method per tick: ``brain.tick(ctx)``. It never calls
``perceive`` / ``decide`` / ``act`` directly.
"""

from __future__ import annotations

from typing import Any

from ..node import Node
from ..properties import Property
from .blackboard import Blackboard
from .brain import AIContext, Brain


[docs] def build_ai_context(node: Node, dt: float, blackboard: Blackboard) -> AIContext: """Snapshot a live in-tree node into an :class:`AIContext` for one tick. ``agent`` is the node, ``world`` is its :class:`SceneTree`, and ``now`` is the tree's accumulated sim clock. Must be called while the node is in the tree (``node.tree`` is ``None`` before ``on_enter_tree``); that is a programming error, raised eagerly rather than silently feeding the brain ``now=0``. """ tree = node.tree if tree is None: raise RuntimeError("build_ai_context requires an in-tree node (call from on_ready/on_update, not __init__)") return AIContext(agent=node, blackboard=blackboard, dt=dt, now=tree.now, world=tree)
[docs] class BrainRunner: """Drive a :class:`Brain` at a fixed cadence from any node's per-frame update. ``tick_interval == 0`` ticks every frame (the classical authoritative cadence). ``tick_interval > 0`` ticks every ``tick_interval`` seconds, and the ``dt`` the brain sees is the time elapsed since the previous tick (the accumulator), not the raw frame ``dt``. """ def __init__(self, brain: Brain, blackboard: Blackboard, *, tick_interval: float = 0.0) -> None: self.brain = brain self.blackboard = blackboard self.tick_interval = max(0.0, tick_interval) self._accum = 0.0
[docs] def step(self, node: Node, dt: float) -> None: """Advance the cadence by ``dt`` and tick the brain if it is due.""" if self.tick_interval <= 0.0: self.brain.tick(build_ai_context(node, dt, self.blackboard)) return self._accum += dt if self._accum < self.tick_interval: return elapsed = self._accum self._accum = 0.0 self.brain.tick(build_ai_context(node, elapsed, self.blackboard))
def _teardown_brain(brain: Brain | None) -> None: """Duck-typed teardown so core never imports the LLM brain. A classical brain has no resources; an ``LLMBrain`` exposes ``close`` to shut its :class:`AsyncSlot`. We call whichever it has, keeping core LLM-free. """ if brain is None: return close = getattr(brain, "close", None) if callable(close): close()
[docs] class AgentNode(Node): """A node that drives one :class:`Brain` every frame (or at ``tick_interval``). Set ``self.brain`` (in ``__init__`` or before the node enters the tree). The node builds its :class:`Blackboard` in ``on_ready`` (optionally a ``child()`` of a squad board), creates a :class:`BrainRunner`, ticks it in ``on_update``, and tears the brain down in ``on_exit_tree``. With ``self.brain = None`` it is an inert node, so subclasses can set the brain conditionally. """ tick_interval = Property(0.0, range=(0.0, 10.0)) def __init__(self, *, brain: Brain | None = None, squad: Blackboard | None = None, **kwargs: Any) -> None: super().__init__(**kwargs) self.brain: Brain | None = brain self.squad = squad self.blackboard: Blackboard | None = None self.runner: BrainRunner | None = None
[docs] def on_ready(self) -> None: if self.brain is None: return self.blackboard = self.squad.child() if self.squad is not None else Blackboard() self.runner = BrainRunner(self.brain, self.blackboard, tick_interval=self.tick_interval)
[docs] def on_update(self, dt: float) -> None: if self.runner is None or self.tree is None: return self.runner.step(self, dt)
[docs] def on_exit_tree(self) -> None: _teardown_brain(self.brain)