Squad Commander

a hybrid RTS where a low-frequency LLM commander re-prioritises a classical squad.

▶ Run in browser

Tags: ai llm rts agent

Two layers, cleanly split:

  • The classical squad AI is authoritative and runs every frame. Each blue unit is a dep-free Brain (SquadUnitBrain) that reads the shared squad plan off its blackboard and returns a concrete Action this frame: advance and attack the nearest red enemy (attack), hold position (hold), or fall back to the rally point (retreat). It imports no LLM code and runs identically whether or not a commander is present.

  • The LLM commander is a stage, not the controller. A few seconds apart and entirely off the frame thread, SquadCommanderBrain asks the model for a typed battle plan (a strict stance / focus / target schema) and writes that one plan onto the shared squad blackboard, which every unit reads. A late / dropped / failed / malformed call keeps the last good plan; the squad never stalls and never depends on the model being present.

Drop the commander entirely and the squad still plays on the default plan: the LLM is provably optional.

Run offline (default: a scripted fake commander reads the battlefield and swaps the stance, after a trivial awaitable that proves the off-thread path):

uv run python examples/demos/squad_commander.py

Run against a real model (records to a local cache so a re-run is deterministic):

SIMVX_LLM_BASE_URL=http://host:8000/v1 SIMVX_LLM_MODEL=your-model         uv run python examples/demos/squad_commander.py --live

Controls: Space spawns a wave of red enemies (watch the commander switch to attack), R forces a manual retreat order, Esc quits.

Source

  1"""Squad Commander: a hybrid RTS where a low-frequency LLM commander re-prioritises a classical squad.
  2
  3Two layers, cleanly split:
  4
  5- The **classical squad AI** is authoritative and runs **every frame**. Each blue
  6  unit is a dep-free ``Brain`` (``SquadUnitBrain``) that reads the shared squad
  7  plan off its blackboard and returns a concrete Action this frame: advance and
  8  attack the nearest red enemy (``attack``), hold position (``hold``), or fall back
  9  to the rally point (``retreat``). It imports no LLM code and runs identically
 10  whether or not a commander is present.
 11
 12- The **LLM commander** is a stage, not the controller. A few seconds apart and
 13  entirely off the frame thread, ``SquadCommanderBrain`` asks the model for a typed
 14  battle plan (a strict ``stance`` / ``focus`` / ``target`` schema) and writes that
 15  one plan onto the *shared squad blackboard*, which every unit reads. A late /
 16  dropped / failed / malformed call keeps the last good plan; the squad never stalls
 17  and never depends on the model being present.
 18
 19Drop the commander entirely and the squad still plays on the default plan: the LLM
 20is provably optional.
 21
 22Run offline (default: a scripted fake commander reads the battlefield and swaps the
 23stance, after a trivial awaitable that proves the off-thread path):
 24
 25    uv run python examples/demos/squad_commander.py
 26
 27Run against a real model (records to a local cache so a re-run is deterministic):
 28
 29    SIMVX_LLM_BASE_URL=http://host:8000/v1 SIMVX_LLM_MODEL=your-model \
 30        uv run python examples/demos/squad_commander.py --live
 31
 32Controls: Space spawns a wave of red enemies (watch the commander switch to attack),
 33R forces a manual retreat order, Esc quits.
 34
 35# /// simvx
 36# tags = ["ai", "llm", "rts", "agent"]
 37# ///
 38"""
 39
 40from __future__ import annotations
 41
 42import asyncio
 43import json
 44import sys
 45
 46from simvx.ai import DEFAULT_PLAN, PLAN_KEY, CachingClient, OpenAICompatibleClient, SquadCommanderBrain
 47from simvx.ai.client import LLMClient, LLMResponse
 48from simvx.core import AnchorPreset, Input, InputMap, Key, Label, Node2D, Vec2
 49from simvx.core.ai import Action, ActionResult, AgentNode, AIContext, Blackboard, Brain
 50from simvx.graphics import App
 51
 52WIDTH, HEIGHT = 1000, 700
 53RALLY = Vec2(140, HEIGHT / 2)
 54CACHE_DIR = "/tmp/simvx_squad_commander_cache"
 55
 56UNIT_SPEED = 130.0
 57UNIT_RADIUS = 12.0
 58ATTACK_RANGE = 60.0
 59ENEMY_RADIUS = 12.0
 60
 61
 62class ScriptedCommanderClient(LLMClient):
 63    """An offline fake commander: a trivial awaitable, then a plan for the battlefield.
 64
 65    Stands in for a real model so the demo runs with no network, while still
 66    exercising the full async path (the ``await`` runs on the AsyncSlot loop, never
 67    the frame thread), so non-blocking / coalescing / degrade behaviour is identical.
 68    """
 69
 70    async def complete(self, messages, **kwargs) -> LLMResponse:
 71        await asyncio.sleep(0.06)  # simulate model latency, off the frame thread
 72        facts = json.loads(messages[-1]["content"].split("Battlefield: ", 1)[1])
 73        enemies = int(facts.get("enemy_count", 0))
 74        forced = facts.get("forced_retreat", False)
 75        if forced:
 76            plan = {"stance": "retreat", "focus": "nearest", "target": None, "rationale": "manual fallback"}
 77        elif enemies == 0:
 78            plan = {"stance": "hold", "focus": "nearest", "target": None, "rationale": "no contacts, hold rally"}
 79        elif enemies >= 4:
 80            plan = {"stance": "retreat", "focus": "nearest", "target": None, "rationale": "overwhelmed, fall back"}
 81        else:
 82            plan = {"stance": "attack", "focus": "nearest", "target": None, "rationale": "engage, we have numbers"}
 83        return LLMResponse(text=json.dumps(plan))
 84
 85
 86def _nearest_enemy(ctx: AIContext, pos: Vec2):
 87    enemies = ctx.blackboard.get("enemies", ())
 88    best = None
 89    best_d = float("inf")
 90    for enemy in enemies:
 91        if not enemy.alive:
 92            continue
 93        d = float((enemy.position - pos).length())
 94        if d < best_d:
 95            best_d, best = d, enemy
 96    return best, best_d
 97
 98
 99class MoveToward(Action):
100    """Step the unit toward a target point at unit speed (authoritative, every frame)."""
101
102    def __init__(self, target: Vec2) -> None:
103        self.target = target
104
105    def execute(self, ctx: AIContext) -> ActionResult:
106        unit = ctx.agent
107        delta = self.target - unit.position
108        dist = float(delta.length())
109        if dist > 1.0:
110            unit.position = unit.position + delta * (min(UNIT_SPEED * ctx.dt, dist) / dist)
111        return ActionResult.success("move")
112
113
114class AttackEnemy(Action):
115    """Damage the nearest enemy if in range, else close the distance."""
116
117    def execute(self, ctx: AIContext) -> ActionResult:
118        unit = ctx.agent
119        enemy, dist = _nearest_enemy(ctx, unit.position)
120        if enemy is None:
121            return MoveToward(RALLY).execute(ctx)
122        if dist <= ATTACK_RANGE:
123            enemy.hp -= 90.0 * ctx.dt
124            unit.firing_at = enemy.position.copy()
125            return ActionResult.success("attack")
126        unit.firing_at = None
127        return MoveToward(enemy.position).execute(ctx)
128
129
130class SquadUnitBrain(Brain):
131    """Dep-free classical unit AI: reads the shared plan every frame, returns an Action.
132
133    Authoritative for unit actions and imports no LLM code. With no plan on the
134    board (no commander present) ``get`` falls back to ``DEFAULT_PLAN`` -> a sane hold.
135    """
136
137    def decide(self, ctx: AIContext) -> Action | None:
138        plan = ctx.blackboard.get(PLAN_KEY, DEFAULT_PLAN)
139        stance = plan["stance"]
140        if stance == "attack":
141            return AttackEnemy()
142        if stance == "retreat":
143            ctx.agent.firing_at = None
144            return MoveToward(RALLY)
145        ctx.agent.firing_at = None
146        return MoveToward(ctx.agent.home)  # hold: settle on the unit's home slot
147
148
149class Unit(AgentNode):
150    """A blue squad unit driven by the classical ``SquadUnitBrain`` every frame."""
151
152    def __init__(self, home: Vec2, squad: Blackboard, **kwargs) -> None:
153        super().__init__(brain=SquadUnitBrain(), squad=squad, **kwargs)
154        self.home = home
155        self.position = home.copy()
156        self.firing_at: Vec2 | None = None
157
158    def on_draw(self, renderer) -> None:
159        renderer.draw_circle(self.position, UNIT_RADIUS, colour=(0.3, 0.6, 1.0, 1.0), filled=True)
160        if self.firing_at is not None:
161            renderer.draw_line(self.position, self.firing_at, colour=(1.0, 0.9, 0.3, 0.8), thickness=2.0)
162
163
164class Enemy:
165    """A red target. Plain object (not a node): the demo draws and culls them."""
166
167    def __init__(self, position: Vec2) -> None:
168        self.position = position
169        self.hp = 100.0
170
171    @property
172    def alive(self) -> bool:
173        return self.hp > 0.0
174
175
176class CommanderHUD(Node2D):
177    """Bottom HUD showing the live plan (last good if a call is in flight / failed)."""
178
179    def __init__(self, squad: Blackboard, **kwargs) -> None:
180        super().__init__(**kwargs)
181        self.squad = squad
182        self.label: Label | None = None
183
184    def on_ready(self) -> None:
185        label = Label("...", name="Plan")
186        label.set_anchor_preset(AnchorPreset.BOTTOM_WIDE)
187        label.margin_left = 20
188        label.margin_right = 20
189        label.margin_top = -64
190        label.margin_bottom = -16
191        label.font_size = 22.0
192        label.alignment = "center"
193        self.add_child(label)
194        self.label = label
195
196    def on_update(self, dt: float) -> None:
197        if self.label is None:
198            return
199        plan = self.squad.get(PLAN_KEY, DEFAULT_PLAN)
200        self.label.text = f"COMMANDER  stance: {plan['stance'].upper()}   \"{plan['rationale']}\""
201
202
203class Battle(Node2D):
204    """Root: owns the shared squad board, spawns units + a low-frequency commander."""
205
206    def __init__(self, client: LLMClient | None = None, **kwargs) -> None:
207        super().__init__(**kwargs)
208        # No-arg construction runs offline so the screenshot walker and web export
209        # (both instantiate the root with no args) get the scripted commander;
210        # main() passes a real client for --live.
211        self._client = client if client is not None else ScriptedCommanderClient()
212        self.squad = Blackboard()
213        self.enemies: list[Enemy] = []
214        self._forced_retreat_t = 0.0
215        self.commander: AgentNode | None = None
216        self.title: Label | None = None
217
218    def on_ready(self) -> None:
219        InputMap.add_action("spawn_wave", [Key.SPACE])
220        InputMap.add_action("retreat", [Key.R])
221        InputMap.add_action("quit", [Key.ESCAPE])
222
223        # Shared squad facts the commander reads (allowlisted slice, not the tree).
224        self.squad.set("enemies", self.enemies)
225        self.squad.set("enemy_count", 0)
226        self.squad.set("forced_retreat", False)
227
228        # Classical units: each gets squad.child(), reads the plan every frame.
229        for i in range(5):
230            home = Vec2(220, 160 + i * 95)
231            self.add_child(Unit(home, self.squad, name=f"Unit{i}"))
232
233        # The low-frequency LLM commander. Its own AgentNode ticks every frame, but
234        # the brain self-throttles to ``period`` seconds and writes to the squad board.
235        self.commander = AgentNode(
236            brain=SquadCommanderBrain(
237                self._client,
238                self.squad,
239                facts=["enemy_count", "forced_retreat"],
240                period=2.5,
241            ),
242            name="Commander",
243        )
244        self.add_child(self.commander)
245        self.add_child(CommanderHUD(self.squad, name="HUD"))
246
247        title = Label("Squad Commander  -  Space: spawn wave   R: retreat   Esc: quit", name="Title")
248        title.set_anchor_preset(AnchorPreset.CENTER_TOP)
249        title.margin_left = -320
250        title.margin_right = 320
251        title.margin_top = 16
252        title.margin_bottom = 40
253        title.font_size = 18.0
254        title.alignment = "center"
255        self.add_child(title)
256        self.title = title
257
258    def on_update(self, dt: float) -> None:
259        if Input.is_action_just_pressed("spawn_wave"):
260            for i in range(3):
261                self.enemies.append(Enemy(Vec2(WIDTH - 120, 180 + i * 130)))
262        if Input.is_action_just_pressed("retreat"):
263            self._forced_retreat_t = 3.0
264        if Input.is_action_just_pressed("quit"):
265            self.app.quit()
266
267        # Enemies drift toward the squad and are culled when dead.
268        for enemy in self.enemies:
269            enemy.position.x -= 25.0 * dt
270        self.enemies[:] = [e for e in self.enemies if e.alive and e.position.x > 60]
271
272        # Update the authoritative facts the commander reads (every frame).
273        self._forced_retreat_t = max(0.0, self._forced_retreat_t - dt)
274        self.squad.set("enemy_count", len(self.enemies))
275        self.squad.set("forced_retreat", self._forced_retreat_t > 0.0)
276
277    def on_draw(self, renderer) -> None:
278        renderer.draw_circle(RALLY, 18, colour=(0.3, 0.6, 1.0, 0.25), filled=True)
279        for enemy in self.enemies:
280            renderer.draw_circle(enemy.position, ENEMY_RADIUS, colour=(1.0, 0.35, 0.35, 1.0), filled=True)
281            renderer.draw_rect(
282                (enemy.position.x - 14, enemy.position.y - 22),
283                (28 * max(0.0, enemy.hp) / 100.0, 4),
284                colour=(0.4, 1.0, 0.4, 1.0),
285                filled=True,
286            )
287
288
289def _build_client(live: bool) -> LLMClient:
290    if not live:
291        return ScriptedCommanderClient()
292    return CachingClient(OpenAICompatibleClient.from_env(), CACHE_DIR, mode="auto")
293
294
295def main() -> None:
296    live = "--live" in sys.argv
297    app = App(title="SimVX Squad Commander", width=WIDTH, height=HEIGHT)
298    app.run(Battle(_build_client(live), name="Battle"))
299
300
301if __name__ == "__main__":
302    main()