"""PyConsoleNode -- an in-process Python REPL that speaks the TerminalEmulator contract.
Where ShellNode wraps an OS subprocess (and needs a real PTY for line editing),
PyConsoleNode runs an interactive interpreter *inside the running app*. It exposes
the same duck-typed surface a TerminalEmulator attaches to -- ``stdout_data`` /
``stderr_data`` / ``process_exited`` signals plus ``write()`` / ``start()`` /
``stop()`` / ``resize()`` -- so the same widget drives either backend.
Because it is in-process it works identically on desktop and in the browser
(Pyodide), where no subprocess or pty device exists. It also shares the host
interpreter, so the namespace can expose live objects (the scene tree, nodes)
for debugging -- something an external ``python3`` subprocess cannot do.
The flip side of in-process execution: a command that blocks (``input()``, a long
loop, ``time.sleep``) blocks the frame loop. ``input()`` is therefore replaced with
a clear error; long-running work is the caller's responsibility.
"""
import code
import logging
import re
import sys
from contextlib import redirect_stderr, redirect_stdout
from .node import Node
from .signals import Signal
log = logging.getLogger(__name__)
# A complete escape sequence sent by the terminal on a special key: CSI (``\x1b[`` …
# final letter or ``~``) or SS3 (``\x1bO`` … letter, used for F1-F4).
_ESC_SEQ = re.compile(r"\x1b(?:\[[0-9;]*[A-Za-z~]|O[A-Za-z])")
class _Sink:
"""Minimal write-only stream that forwards text to a callback."""
def __init__(self, emit):
self._emit = emit
def write(self, text: str) -> int:
if text:
self._emit(text)
return len(text)
def flush(self):
pass
def isatty(self) -> bool:
return False
def _blocked_input(*_args, **_kwargs):
raise RuntimeError("input() is not available in the embedded console (it would block the frame loop)")
[docs]
class PyConsoleNode(Node):
"""Interactive in-process Python REPL, attachable to a ``TerminalEmulator``.
Emits ``stdout_data(str)`` for results/prints and prompts, ``stderr_data(str)``
for tracebacks, and ``process_exited(int)`` when the console is closed (Ctrl-D
on an empty line, ``exit()``, or :meth:`stop`).
``namespace`` seeds the interpreter globals; pass live objects here to inspect
them from the console (e.g. ``{"root": my_scene}``).
"""
def __init__(self, namespace: dict | None = None, *, banner: str | None = None, **kwargs):
super().__init__(**kwargs)
self.stdout_data = Signal()
self.stderr_data = Signal()
self.process_exited = Signal()
self._banner = banner
self._cols = 80
self._running = False
self._exited = False
ns = {"__name__": "__console__", "__builtins__": __builtins__}
ns["input"] = ns["raw_input"] = _blocked_input
if namespace:
ns.update(namespace)
self._ns = ns
self._console = code.InteractiveConsole(locals=ns)
self._out = _Sink(self.stdout_data)
self._err = _Sink(self.stderr_data)
# Line-editing state for the current input line.
self._line = ""
self._pos = 0
self._more = False # inside a multi-line block (ps2)
self._pending = "" # bytes received but not yet parsed (partial escape)
# History (most-recent last); _hist_idx points into it during recall.
self._history: list[str] = []
self._hist_idx: int | None = None
self._stash = "" # edit-in-progress saved when browsing history
# -- ShellNode-compatible surface -----------------------------------------
[docs]
@property
def ps1(self) -> str:
return getattr(sys, "ps1", ">>> ")
[docs]
@property
def ps2(self) -> str:
return getattr(sys, "ps2", "... ")
[docs]
@property
def running(self) -> bool:
return self._running
[docs]
def start(self, *_args):
"""Print the banner and the first prompt."""
if self._running:
return
self._running = True
self._exited = False
banner = self._banner
if banner is None:
banner = f"Python {sys.version.split()[0]} -- embedded SimVX console"
if banner:
self.stdout_data(banner.replace("\n", "\r\n") + "\r\n")
self._prompt()
[docs]
def stop(self):
"""Close the console, emitting ``process_exited`` once."""
self._finish(0)
[docs]
def resize(self, cols: int, _rows: int):
self._cols = max(1, int(cols))
[docs]
def write(self, data: str):
"""Feed terminal keystrokes (raw chars + escape sequences) to the line editor."""
if not self._running or data is None:
return
self._pending += data if isinstance(data, str) else data.decode(errors="replace")
while self._pending:
m = _ESC_SEQ.match(self._pending)
if m:
self._key(m.group())
self._pending = self._pending[m.end() :]
continue
if self._pending[0] == "\x1b":
# Lone ESC or an incomplete sequence: wait for the rest, unless it is
# clearly just a bare ESC followed by ordinary text.
if len(self._pending) == 1 or self._pending[1] not in "[O":
self._pending = self._pending[1:] # drop bare ESC
continue
if not _ESC_SEQ.search(self._pending):
break # partial sequence; await more input
ch = self._pending[0]
self._pending = self._pending[1:]
self._char(ch)
# -- key / character handling ----------------------------------------------
def _key(self, seq: str):
"""Handle a parsed escape sequence (arrows, home/end, delete)."""
final = seq[-1]
if final == "A": # up -- older history
self._history_prev()
elif final == "B": # down -- newer history
self._history_next()
elif final == "C": # right
if self._pos < len(self._line):
self._pos += 1
self.stdout_data("\x1b[C")
elif final == "D": # left
if self._pos > 0:
self._pos -= 1
self.stdout_data("\x1b[D")
elif final == "H" or seq == "\x1b[1~": # home
self._pos = 0
self._redraw()
elif final == "F" or seq == "\x1b[4~": # end
self._pos = len(self._line)
self._redraw()
elif seq == "\x1b[3~": # delete (forward)
if self._pos < len(self._line):
self._line = self._line[: self._pos] + self._line[self._pos + 1 :]
self._redraw()
# pageup/pagedown and anything else: ignored (scrollback is the widget's job)
def _char(self, ch: str):
"""Handle a single non-escape character."""
if ch in ("\r", "\n"):
self._submit()
elif ch == "\x7f" or ch == "\b": # backspace
if self._pos > 0:
self._line = self._line[: self._pos - 1] + self._line[self._pos :]
self._pos -= 1
self._redraw()
elif ch == "\x03": # Ctrl-C: abandon the current line
self.stdout_data("^C\r\n")
self._reset_line()
self._more = False
self._console.resetbuffer()
self._prompt()
elif ch == "\x04": # Ctrl-D: EOF on empty line closes the console
if not self._line:
self.stdout_data("\r\n")
self._finish(0)
elif ch == "\x0c": # Ctrl-L: clear screen, reprint prompt + line
self.stdout_data("\x1b[2J\x1b[H")
self._prompt(redraw=True)
elif ch == "\t":
self._insert(" ")
elif ch >= " ":
self._insert(ch)
# other control codes: ignored
def _insert(self, text: str):
self._line = self._line[: self._pos] + text + self._line[self._pos :]
self._pos += len(text)
self._redraw()
# -- history ---------------------------------------------------------------
def _history_prev(self):
if not self._history:
return
if self._hist_idx is None:
self._stash = self._line
self._hist_idx = len(self._history)
if self._hist_idx > 0:
self._hist_idx -= 1
self._set_line(self._history[self._hist_idx])
def _history_next(self):
if self._hist_idx is None:
return
self._hist_idx += 1
if self._hist_idx >= len(self._history):
self._hist_idx = None
self._set_line(self._stash)
else:
self._set_line(self._history[self._hist_idx])
def _set_line(self, text: str):
self._line = text
self._pos = len(text)
self._redraw()
# -- rendering -------------------------------------------------------------
def _prompt(self, *, redraw: bool = False):
prompt = self.ps2 if self._more else self.ps1
if redraw:
self.stdout_data("\r\x1b[K" + prompt + self._line + f"\x1b[{len(prompt) + self._pos + 1}G")
else:
self.stdout_data(prompt)
def _redraw(self):
"""Reprint the current input line in place (single-row lines)."""
prompt = self.ps2 if self._more else self.ps1
self.stdout_data("\r\x1b[K" + prompt + self._line + f"\x1b[{len(prompt) + self._pos + 1}G")
def _reset_line(self):
self._line = ""
self._pos = 0
self._hist_idx = None
self._stash = ""
# -- execution -------------------------------------------------------------
def _submit(self):
line = self._line
self.stdout_data("\r\n")
if line.strip() and (not self._history or self._history[-1] != line):
self._history.append(line)
self._reset_line()
try:
with redirect_stdout(self._out), redirect_stderr(self._err):
self._more = self._console.push(line)
except SystemExit as e:
self._finish(int(e.code) if isinstance(e.code, int) else 0)
return
except BaseException: # noqa: BLE001 -- the console must survive any user error
self._console.resetbuffer()
self._more = False
log.debug("console command raised", exc_info=True)
if self._running:
self._prompt()
def _finish(self, code_: int):
if self._exited:
return
self._exited = True
self._running = False
self.process_exited(code_)
[docs]
def on_exit_tree(self):
if self._running:
self._finish(0)