Squad Commander¶
a hybrid RTS where a low-frequency LLM commander re-prioritises a classical squad.
▶ Run in browserTags: ai llm rts agent
Two layers, cleanly split:
The classical squad AI is authoritative and runs every frame. Each blue unit is a dep-free
Brain(SquadUnitBrain) that reads the shared squad plan off its blackboard and returns a concrete Action this frame: advance and attack the nearest red enemy (attack), hold position (hold), or fall back to the rally point (retreat). It imports no LLM code and runs identically whether or not a commander is present.The LLM commander is a stage, not the controller. A few seconds apart and entirely off the frame thread,
SquadCommanderBrainasks the model for a typed battle plan (a strictstance/focus/targetschema) and writes that one plan onto the shared squad blackboard, which every unit reads. A late / dropped / failed / malformed call keeps the last good plan; the squad never stalls and never depends on the model being present.
Drop the commander entirely and the squad still plays on the default plan: the LLM is provably optional.
Run offline (default: a scripted fake commander reads the battlefield and swaps the stance, after a trivial awaitable that proves the off-thread path):
uv run python examples/demos/squad_commander.py
Run against a real model (records to a local cache so a re-run is deterministic):
SIMVX_LLM_BASE_URL=http://host:8000/v1 SIMVX_LLM_MODEL=your-model uv run python examples/demos/squad_commander.py --live
Controls: Space spawns a wave of red enemies (watch the commander switch to attack), R forces a manual retreat order, Esc quits.
Source¶
1"""Squad Commander: a hybrid RTS where a low-frequency LLM commander re-prioritises a classical squad.
2
3Two layers, cleanly split:
4
5- The **classical squad AI** is authoritative and runs **every frame**. Each blue
6 unit is a dep-free ``Brain`` (``SquadUnitBrain``) that reads the shared squad
7 plan off its blackboard and returns a concrete Action this frame: advance and
8 attack the nearest red enemy (``attack``), hold position (``hold``), or fall back
9 to the rally point (``retreat``). It imports no LLM code and runs identically
10 whether or not a commander is present.
11
12- The **LLM commander** is a stage, not the controller. A few seconds apart and
13 entirely off the frame thread, ``SquadCommanderBrain`` asks the model for a typed
14 battle plan (a strict ``stance`` / ``focus`` / ``target`` schema) and writes that
15 one plan onto the *shared squad blackboard*, which every unit reads. A late /
16 dropped / failed / malformed call keeps the last good plan; the squad never stalls
17 and never depends on the model being present.
18
19Drop the commander entirely and the squad still plays on the default plan: the LLM
20is provably optional.
21
22Run offline (default: a scripted fake commander reads the battlefield and swaps the
23stance, after a trivial awaitable that proves the off-thread path):
24
25 uv run python examples/demos/squad_commander.py
26
27Run against a real model (records to a local cache so a re-run is deterministic):
28
29 SIMVX_LLM_BASE_URL=http://host:8000/v1 SIMVX_LLM_MODEL=your-model \
30 uv run python examples/demos/squad_commander.py --live
31
32Controls: Space spawns a wave of red enemies (watch the commander switch to attack),
33R forces a manual retreat order, Esc quits.
34
35# /// simvx
36# tags = ["ai", "llm", "rts", "agent"]
37# ///
38"""
39
40from __future__ import annotations
41
42import asyncio
43import json
44import sys
45
46from simvx.ai import DEFAULT_PLAN, PLAN_KEY, CachingClient, OpenAICompatibleClient, SquadCommanderBrain
47from simvx.ai.client import LLMClient, LLMResponse
48from simvx.core import AnchorPreset, Input, InputMap, Key, Label, Node2D, Vec2
49from simvx.core.ai import Action, ActionResult, AgentNode, AIContext, Blackboard, Brain
50from simvx.graphics import App
51
52WIDTH, HEIGHT = 1000, 700
53RALLY = Vec2(140, HEIGHT / 2)
54CACHE_DIR = "/tmp/simvx_squad_commander_cache"
55
56UNIT_SPEED = 130.0
57UNIT_RADIUS = 12.0
58ATTACK_RANGE = 60.0
59ENEMY_RADIUS = 12.0
60
61
62class ScriptedCommanderClient(LLMClient):
63 """An offline fake commander: a trivial awaitable, then a plan for the battlefield.
64
65 Stands in for a real model so the demo runs with no network, while still
66 exercising the full async path (the ``await`` runs on the AsyncSlot loop, never
67 the frame thread), so non-blocking / coalescing / degrade behaviour is identical.
68 """
69
70 async def complete(self, messages, **kwargs) -> LLMResponse:
71 await asyncio.sleep(0.06) # simulate model latency, off the frame thread
72 facts = json.loads(messages[-1]["content"].split("Battlefield: ", 1)[1])
73 enemies = int(facts.get("enemy_count", 0))
74 forced = facts.get("forced_retreat", False)
75 if forced:
76 plan = {"stance": "retreat", "focus": "nearest", "target": None, "rationale": "manual fallback"}
77 elif enemies == 0:
78 plan = {"stance": "hold", "focus": "nearest", "target": None, "rationale": "no contacts, hold rally"}
79 elif enemies >= 4:
80 plan = {"stance": "retreat", "focus": "nearest", "target": None, "rationale": "overwhelmed, fall back"}
81 else:
82 plan = {"stance": "attack", "focus": "nearest", "target": None, "rationale": "engage, we have numbers"}
83 return LLMResponse(text=json.dumps(plan))
84
85
86def _nearest_enemy(ctx: AIContext, pos: Vec2):
87 enemies = ctx.blackboard.get("enemies", ())
88 best = None
89 best_d = float("inf")
90 for enemy in enemies:
91 if not enemy.alive:
92 continue
93 d = float((enemy.position - pos).length())
94 if d < best_d:
95 best_d, best = d, enemy
96 return best, best_d
97
98
99class MoveToward(Action):
100 """Step the unit toward a target point at unit speed (authoritative, every frame)."""
101
102 def __init__(self, target: Vec2) -> None:
103 self.target = target
104
105 def execute(self, ctx: AIContext) -> ActionResult:
106 unit = ctx.agent
107 delta = self.target - unit.position
108 dist = float(delta.length())
109 if dist > 1.0:
110 unit.position = unit.position + delta * (min(UNIT_SPEED * ctx.dt, dist) / dist)
111 return ActionResult.success("move")
112
113
114class AttackEnemy(Action):
115 """Damage the nearest enemy if in range, else close the distance."""
116
117 def execute(self, ctx: AIContext) -> ActionResult:
118 unit = ctx.agent
119 enemy, dist = _nearest_enemy(ctx, unit.position)
120 if enemy is None:
121 return MoveToward(RALLY).execute(ctx)
122 if dist <= ATTACK_RANGE:
123 enemy.hp -= 90.0 * ctx.dt
124 unit.firing_at = enemy.position.copy()
125 return ActionResult.success("attack")
126 unit.firing_at = None
127 return MoveToward(enemy.position).execute(ctx)
128
129
130class SquadUnitBrain(Brain):
131 """Dep-free classical unit AI: reads the shared plan every frame, returns an Action.
132
133 Authoritative for unit actions and imports no LLM code. With no plan on the
134 board (no commander present) ``get`` falls back to ``DEFAULT_PLAN`` -> a sane hold.
135 """
136
137 def decide(self, ctx: AIContext) -> Action | None:
138 plan = ctx.blackboard.get(PLAN_KEY, DEFAULT_PLAN)
139 stance = plan["stance"]
140 if stance == "attack":
141 return AttackEnemy()
142 if stance == "retreat":
143 ctx.agent.firing_at = None
144 return MoveToward(RALLY)
145 ctx.agent.firing_at = None
146 return MoveToward(ctx.agent.home) # hold: settle on the unit's home slot
147
148
149class Unit(AgentNode):
150 """A blue squad unit driven by the classical ``SquadUnitBrain`` every frame."""
151
152 def __init__(self, home: Vec2, squad: Blackboard, **kwargs) -> None:
153 super().__init__(brain=SquadUnitBrain(), squad=squad, **kwargs)
154 self.home = home
155 self.position = home.copy()
156 self.firing_at: Vec2 | None = None
157
158 def on_draw(self, renderer) -> None:
159 renderer.draw_circle(self.position, UNIT_RADIUS, colour=(0.3, 0.6, 1.0, 1.0), filled=True)
160 if self.firing_at is not None:
161 renderer.draw_line(self.position, self.firing_at, colour=(1.0, 0.9, 0.3, 0.8), thickness=2.0)
162
163
164class Enemy:
165 """A red target. Plain object (not a node): the demo draws and culls them."""
166
167 def __init__(self, position: Vec2) -> None:
168 self.position = position
169 self.hp = 100.0
170
171 @property
172 def alive(self) -> bool:
173 return self.hp > 0.0
174
175
176class CommanderHUD(Node2D):
177 """Bottom HUD showing the live plan (last good if a call is in flight / failed)."""
178
179 def __init__(self, squad: Blackboard, **kwargs) -> None:
180 super().__init__(**kwargs)
181 self.squad = squad
182 self.label: Label | None = None
183
184 def on_ready(self) -> None:
185 label = Label("...", name="Plan")
186 label.set_anchor_preset(AnchorPreset.BOTTOM_WIDE)
187 label.margin_left = 20
188 label.margin_right = 20
189 label.margin_top = -64
190 label.margin_bottom = -16
191 label.font_size = 22.0
192 label.alignment = "center"
193 self.add_child(label)
194 self.label = label
195
196 def on_update(self, dt: float) -> None:
197 if self.label is None:
198 return
199 plan = self.squad.get(PLAN_KEY, DEFAULT_PLAN)
200 self.label.text = f"COMMANDER stance: {plan['stance'].upper()} \"{plan['rationale']}\""
201
202
203class Battle(Node2D):
204 """Root: owns the shared squad board, spawns units + a low-frequency commander."""
205
206 def __init__(self, client: LLMClient | None = None, **kwargs) -> None:
207 super().__init__(**kwargs)
208 # No-arg construction runs offline so the screenshot walker and web export
209 # (both instantiate the root with no args) get the scripted commander;
210 # main() passes a real client for --live.
211 self._client = client if client is not None else ScriptedCommanderClient()
212 self.squad = Blackboard()
213 self.enemies: list[Enemy] = []
214 self._forced_retreat_t = 0.0
215 self.commander: AgentNode | None = None
216 self.title: Label | None = None
217
218 def on_ready(self) -> None:
219 InputMap.add_action("spawn_wave", [Key.SPACE])
220 InputMap.add_action("retreat", [Key.R])
221 InputMap.add_action("quit", [Key.ESCAPE])
222
223 # Shared squad facts the commander reads (allowlisted slice, not the tree).
224 self.squad.set("enemies", self.enemies)
225 self.squad.set("enemy_count", 0)
226 self.squad.set("forced_retreat", False)
227
228 # Classical units: each gets squad.child(), reads the plan every frame.
229 for i in range(5):
230 home = Vec2(220, 160 + i * 95)
231 self.add_child(Unit(home, self.squad, name=f"Unit{i}"))
232
233 # The low-frequency LLM commander. Its own AgentNode ticks every frame, but
234 # the brain self-throttles to ``period`` seconds and writes to the squad board.
235 self.commander = AgentNode(
236 brain=SquadCommanderBrain(
237 self._client,
238 self.squad,
239 facts=["enemy_count", "forced_retreat"],
240 period=2.5,
241 ),
242 name="Commander",
243 )
244 self.add_child(self.commander)
245 self.add_child(CommanderHUD(self.squad, name="HUD"))
246
247 title = Label("Squad Commander - Space: spawn wave R: retreat Esc: quit", name="Title")
248 title.set_anchor_preset(AnchorPreset.CENTER_TOP)
249 title.margin_left = -320
250 title.margin_right = 320
251 title.margin_top = 16
252 title.margin_bottom = 40
253 title.font_size = 18.0
254 title.alignment = "center"
255 self.add_child(title)
256 self.title = title
257
258 def on_update(self, dt: float) -> None:
259 if Input.is_action_just_pressed("spawn_wave"):
260 for i in range(3):
261 self.enemies.append(Enemy(Vec2(WIDTH - 120, 180 + i * 130)))
262 if Input.is_action_just_pressed("retreat"):
263 self._forced_retreat_t = 3.0
264 if Input.is_action_just_pressed("quit"):
265 self.app.quit()
266
267 # Enemies drift toward the squad and are culled when dead.
268 for enemy in self.enemies:
269 enemy.position.x -= 25.0 * dt
270 self.enemies[:] = [e for e in self.enemies if e.alive and e.position.x > 60]
271
272 # Update the authoritative facts the commander reads (every frame).
273 self._forced_retreat_t = max(0.0, self._forced_retreat_t - dt)
274 self.squad.set("enemy_count", len(self.enemies))
275 self.squad.set("forced_retreat", self._forced_retreat_t > 0.0)
276
277 def on_draw(self, renderer) -> None:
278 renderer.draw_circle(RALLY, 18, colour=(0.3, 0.6, 1.0, 0.25), filled=True)
279 for enemy in self.enemies:
280 renderer.draw_circle(enemy.position, ENEMY_RADIUS, colour=(1.0, 0.35, 0.35, 1.0), filled=True)
281 renderer.draw_rect(
282 (enemy.position.x - 14, enemy.position.y - 22),
283 (28 * max(0.0, enemy.hp) / 100.0, 4),
284 colour=(0.4, 1.0, 0.4, 1.0),
285 filled=True,
286 )
287
288
289def _build_client(live: bool) -> LLMClient:
290 if not live:
291 return ScriptedCommanderClient()
292 return CachingClient(OpenAICompatibleClient.from_env(), CACHE_DIR, mode="auto")
293
294
295def main() -> None:
296 live = "--live" in sys.argv
297 app = App(title="SimVX Squad Commander", width=WIDTH, height=HEIGHT)
298 app.run(Battle(_build_client(live), name="Battle"))
299
300
301if __name__ == "__main__":
302 main()