Source code for simvx.core.process_node

"""ProcessNode — Node wrapping subprocess.Popen with non-blocking I/O.

Supports both pipe-based (default) and PTY-based subprocess management.
PTY mode is required for interactive terminal applications like bash, nano, vim.
"""


from __future__ import annotations

import fcntl
import logging
import os
import pty
import selectors
import shlex
import struct
import subprocess
import termios

from .node import Node
from .descriptors import Property, Signal

log = logging.getLogger(__name__)


[docs] class ProcessNode(Node): """Node that manages a subprocess with non-blocking I/O. Emits stdout_data(str) and stderr_data(str) as output arrives, and process_exited(int) when the child terminates. The process is polled each frame in process(dt) with zero-timeout selector calls so the game loop is never blocked. For interactive terminal applications, set use_pty=True to allocate a pseudo-terminal. In PTY mode, stdout and stderr are merged. """ auto_start = Property(False, hint="Start process on tree entry") command = Property("", hint="Command to execute") def __init__(self, command: str = "", *, use_pty: bool = False, env: dict[str, str] | None = None, **kwargs): super().__init__(**kwargs) self.command = command self.use_pty = use_pty self.env = env self.stdout_data = Signal() self.stderr_data = Signal() self.process_exited = Signal() self._proc: subprocess.Popen | None = None self._sel: selectors.DefaultSelector | None = None self._exit_code: int | None = None self._master_fd: int | None = None
[docs] def ready(self): if self.auto_start and self.command: self.start()
[docs] def start(self, command: str | None = None): """Launch the subprocess. Replaces any already-running process.""" if command is not None: self.command = command if not self.command: log.error("ProcessNode.start() called with empty command") return if self._proc is not None: self.stop() args = shlex.split(self.command) if isinstance(self.command, str) else self.command self._exit_code = None if self.use_pty: self._start_pty(args) else: self._start_pipe(args)
def _start_pipe(self, args: list[str]): """Start subprocess with stdin/stdout/stderr pipes.""" try: self._proc = subprocess.Popen( args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env or os.environ.copy(), ) except OSError as e: log.error("Failed to start process %r: %s", self.command, e) return self._sel = selectors.DefaultSelector() for stream, name in ((self._proc.stdout, "stdout"), (self._proc.stderr, "stderr")): os.set_blocking(stream.fileno(), False) self._sel.register(stream, selectors.EVENT_READ, name) def _start_pty(self, args: list[str]): """Start subprocess with a pseudo-terminal.""" master, slave = pty.openpty() env = (self.env or os.environ).copy() env["TERM"] = "xterm-256color" try: self._proc = subprocess.Popen( args, stdin=slave, stdout=slave, stderr=slave, close_fds=True, preexec_fn=os.setsid, env=env, ) except OSError as e: os.close(master) os.close(slave) log.error("Failed to start PTY process %r: %s", self.command, e) return os.close(slave) self._master_fd = master os.set_blocking(master, False) self._sel = selectors.DefaultSelector() self._sel.register(master, selectors.EVENT_READ, "pty")
[docs] def resize(self, cols: int, rows: int): """Set the PTY terminal size. Only works in PTY mode.""" if self._master_fd is not None: winsize = struct.pack("HHHH", rows, cols, 0, 0) fcntl.ioctl(self._master_fd, termios.TIOCSWINSZ, winsize) # Send SIGWINCH to the process group if self._proc and self._proc.pid: try: os.killpg(os.getpgid(self._proc.pid), 28) # SIGWINCH except (OSError, ProcessLookupError): pass
[docs] def stop(self): """Terminate the subprocess, escalating to kill if needed.""" if self._proc is None: return try: self._proc.terminate() try: self._proc.wait(timeout=0.5) except subprocess.TimeoutExpired: self._proc.kill() self._proc.wait(timeout=1.0) except OSError as e: log.warning("Error stopping process: %s", e) self._exit_code = self._proc.returncode self._cleanup()
[docs] def write(self, data: str): """Write data to the subprocess stdin (or PTY master).""" raw = data.encode() if isinstance(data, str) else data if self._master_fd is not None: try: os.write(self._master_fd, raw) except OSError as e: log.warning("PTY write failed: %s", e) return if self._proc is None or self._proc.stdin is None: return try: self._proc.stdin.write(raw) self._proc.stdin.flush() except (BrokenPipeError, OSError) as e: log.warning("ProcessNode write failed: %s", e)
[docs] def process(self, dt: float): if self._proc is None or self._sel is None: return # Read any available output for key, _ in self._sel.select(timeout=0): try: if key.data == "pty": chunk = os.read(self._master_fd, 65536) else: chunk = key.fileobj.read(65536) except OSError: chunk = None if chunk: text = chunk.decode(errors="replace") if key.data in ("stdout", "pty"): self.stdout_data(text) else: self.stderr_data(text) # Check for process exit if self._proc.poll() is not None: self._drain() self._exit_code = self._proc.returncode self._cleanup() self.process_exited(self._exit_code)
def _drain(self): """Read any remaining buffered output after process exits.""" if self._master_fd is not None: try: while True: chunk = os.read(self._master_fd, 65536) if not chunk: break self.stdout_data(chunk.decode(errors="replace")) except OSError: pass return if self._proc is None: return for stream, name in ((self._proc.stdout, "stdout"), (self._proc.stderr, "stderr")): if stream is None: continue try: remaining = stream.read() except OSError: remaining = None if remaining: text = remaining.decode(errors="replace") if name == "stdout": self.stdout_data(text) else: self.stderr_data(text) def _cleanup(self): """Close selector, master fd, and release process handle.""" if self._sel is not None: self._sel.close() self._sel = None if self._master_fd is not None: try: os.close(self._master_fd) except OSError: pass self._master_fd = None if self._proc is not None: for stream in (self._proc.stdin, self._proc.stdout, self._proc.stderr): if stream: try: stream.close() except OSError: pass self._proc = None
[docs] def exit_tree(self): if self._proc is not None: self.stop()
@property def running(self) -> bool: return self._proc is not None and self._proc.poll() is None @property def exit_code(self) -> int | None: if self._proc is not None: return self._proc.returncode return self._exit_code