Source code for simvx.core.py_console

"""PyConsoleNode -- an in-process Python REPL that speaks the TerminalEmulator contract.

Where ShellNode wraps an OS subprocess (and needs a real PTY for line editing),
PyConsoleNode runs an interactive interpreter *inside the running app*. It exposes
the same duck-typed surface a TerminalEmulator attaches to -- ``stdout_data`` /
``stderr_data`` / ``process_exited`` signals plus ``write()`` / ``start()`` /
``stop()`` / ``resize()`` -- so the same widget drives either backend.

Because it is in-process it works identically on desktop and in the browser
(Pyodide), where no subprocess or pty device exists. It also shares the host
interpreter, so the namespace can expose live objects (the scene tree, nodes)
for debugging -- something an external ``python3`` subprocess cannot do.

The flip side of in-process execution: a command that blocks (``input()``, a long
loop, ``time.sleep``) blocks the frame loop. ``input()`` is therefore replaced with
a clear error; long-running work is the caller's responsibility.
"""

import code
import logging
import re
import sys
from contextlib import redirect_stderr, redirect_stdout

from .node import Node
from .signals import Signal

log = logging.getLogger(__name__)

# A complete escape sequence sent by the terminal on a special key: CSI (``\x1b[`` …
# final letter or ``~``) or SS3 (``\x1bO`` … letter, used for F1-F4).
_ESC_SEQ = re.compile(r"\x1b(?:\[[0-9;]*[A-Za-z~]|O[A-Za-z])")


class _Sink:
    """Minimal write-only stream that forwards text to a callback."""

    def __init__(self, emit):
        self._emit = emit

    def write(self, text: str) -> int:
        if text:
            self._emit(text)
        return len(text)

    def flush(self):
        pass

    def isatty(self) -> bool:
        return False


def _blocked_input(*_args, **_kwargs):
    raise RuntimeError("input() is not available in the embedded console (it would block the frame loop)")


[docs] class PyConsoleNode(Node): """Interactive in-process Python REPL, attachable to a ``TerminalEmulator``. Emits ``stdout_data(str)`` for results/prints and prompts, ``stderr_data(str)`` for tracebacks, and ``process_exited(int)`` when the console is closed (Ctrl-D on an empty line, ``exit()``, or :meth:`stop`). ``namespace`` seeds the interpreter globals; pass live objects here to inspect them from the console (e.g. ``{"root": my_scene}``). """ def __init__(self, namespace: dict | None = None, *, banner: str | None = None, **kwargs): super().__init__(**kwargs) self.stdout_data = Signal() self.stderr_data = Signal() self.process_exited = Signal() self._banner = banner self._cols = 80 self._running = False self._exited = False ns = {"__name__": "__console__", "__builtins__": __builtins__} ns["input"] = ns["raw_input"] = _blocked_input if namespace: ns.update(namespace) self._ns = ns self._console = code.InteractiveConsole(locals=ns) self._out = _Sink(self.stdout_data) self._err = _Sink(self.stderr_data) # Line-editing state for the current input line. self._line = "" self._pos = 0 self._more = False # inside a multi-line block (ps2) self._pending = "" # bytes received but not yet parsed (partial escape) # History (most-recent last); _hist_idx points into it during recall. self._history: list[str] = [] self._hist_idx: int | None = None self._stash = "" # edit-in-progress saved when browsing history # -- ShellNode-compatible surface -----------------------------------------
[docs] @property def ps1(self) -> str: return getattr(sys, "ps1", ">>> ")
[docs] @property def ps2(self) -> str: return getattr(sys, "ps2", "... ")
[docs] @property def running(self) -> bool: return self._running
[docs] def start(self, *_args): """Print the banner and the first prompt.""" if self._running: return self._running = True self._exited = False banner = self._banner if banner is None: banner = f"Python {sys.version.split()[0]} -- embedded SimVX console" if banner: self.stdout_data(banner.replace("\n", "\r\n") + "\r\n") self._prompt()
[docs] def stop(self): """Close the console, emitting ``process_exited`` once.""" self._finish(0)
[docs] def resize(self, cols: int, _rows: int): self._cols = max(1, int(cols))
[docs] def write(self, data: str): """Feed terminal keystrokes (raw chars + escape sequences) to the line editor.""" if not self._running or data is None: return self._pending += data if isinstance(data, str) else data.decode(errors="replace") while self._pending: m = _ESC_SEQ.match(self._pending) if m: self._key(m.group()) self._pending = self._pending[m.end() :] continue if self._pending[0] == "\x1b": # Lone ESC or an incomplete sequence: wait for the rest, unless it is # clearly just a bare ESC followed by ordinary text. if len(self._pending) == 1 or self._pending[1] not in "[O": self._pending = self._pending[1:] # drop bare ESC continue if not _ESC_SEQ.search(self._pending): break # partial sequence; await more input ch = self._pending[0] self._pending = self._pending[1:] self._char(ch)
# -- key / character handling ---------------------------------------------- def _key(self, seq: str): """Handle a parsed escape sequence (arrows, home/end, delete).""" final = seq[-1] if final == "A": # up -- older history self._history_prev() elif final == "B": # down -- newer history self._history_next() elif final == "C": # right if self._pos < len(self._line): self._pos += 1 self.stdout_data("\x1b[C") elif final == "D": # left if self._pos > 0: self._pos -= 1 self.stdout_data("\x1b[D") elif final == "H" or seq == "\x1b[1~": # home self._pos = 0 self._redraw() elif final == "F" or seq == "\x1b[4~": # end self._pos = len(self._line) self._redraw() elif seq == "\x1b[3~": # delete (forward) if self._pos < len(self._line): self._line = self._line[: self._pos] + self._line[self._pos + 1 :] self._redraw() # pageup/pagedown and anything else: ignored (scrollback is the widget's job) def _char(self, ch: str): """Handle a single non-escape character.""" if ch in ("\r", "\n"): self._submit() elif ch == "\x7f" or ch == "\b": # backspace if self._pos > 0: self._line = self._line[: self._pos - 1] + self._line[self._pos :] self._pos -= 1 self._redraw() elif ch == "\x03": # Ctrl-C: abandon the current line self.stdout_data("^C\r\n") self._reset_line() self._more = False self._console.resetbuffer() self._prompt() elif ch == "\x04": # Ctrl-D: EOF on empty line closes the console if not self._line: self.stdout_data("\r\n") self._finish(0) elif ch == "\x0c": # Ctrl-L: clear screen, reprint prompt + line self.stdout_data("\x1b[2J\x1b[H") self._prompt(redraw=True) elif ch == "\t": self._insert(" ") elif ch >= " ": self._insert(ch) # other control codes: ignored def _insert(self, text: str): self._line = self._line[: self._pos] + text + self._line[self._pos :] self._pos += len(text) self._redraw() # -- history --------------------------------------------------------------- def _history_prev(self): if not self._history: return if self._hist_idx is None: self._stash = self._line self._hist_idx = len(self._history) if self._hist_idx > 0: self._hist_idx -= 1 self._set_line(self._history[self._hist_idx]) def _history_next(self): if self._hist_idx is None: return self._hist_idx += 1 if self._hist_idx >= len(self._history): self._hist_idx = None self._set_line(self._stash) else: self._set_line(self._history[self._hist_idx]) def _set_line(self, text: str): self._line = text self._pos = len(text) self._redraw() # -- rendering ------------------------------------------------------------- def _prompt(self, *, redraw: bool = False): prompt = self.ps2 if self._more else self.ps1 if redraw: self.stdout_data("\r\x1b[K" + prompt + self._line + f"\x1b[{len(prompt) + self._pos + 1}G") else: self.stdout_data(prompt) def _redraw(self): """Reprint the current input line in place (single-row lines).""" prompt = self.ps2 if self._more else self.ps1 self.stdout_data("\r\x1b[K" + prompt + self._line + f"\x1b[{len(prompt) + self._pos + 1}G") def _reset_line(self): self._line = "" self._pos = 0 self._hist_idx = None self._stash = "" # -- execution ------------------------------------------------------------- def _submit(self): line = self._line self.stdout_data("\r\n") if line.strip() and (not self._history or self._history[-1] != line): self._history.append(line) self._reset_line() try: with redirect_stdout(self._out), redirect_stderr(self._err): self._more = self._console.push(line) except SystemExit as e: self._finish(int(e.code) if isinstance(e.code, int) else 0) return except BaseException: # noqa: BLE001 -- the console must survive any user error self._console.resetbuffer() self._more = False log.debug("console command raised", exc_info=True) if self._running: self._prompt() def _finish(self, code_: int): if self._exited: return self._exited = True self._running = False self.process_exited(code_)
[docs] def on_exit_tree(self): if self._running: self._finish(0)