"""ProcessNode — Node wrapping subprocess.Popen with non-blocking I/O.
Supports both pipe-based (default) and PTY-based subprocess management.
PTY mode is required for interactive terminal applications like bash, nano, vim.
"""
from __future__ import annotations
import fcntl
import logging
import os
import pty
import selectors
import shlex
import struct
import subprocess
import termios
from .node import Node
from .descriptors import Property, Signal
log = logging.getLogger(__name__)
[docs]
class ProcessNode(Node):
"""Node that manages a subprocess with non-blocking I/O.
Emits stdout_data(str) and stderr_data(str) as output arrives, and
process_exited(int) when the child terminates. The process is polled
each frame in process(dt) with zero-timeout selector calls so the
game loop is never blocked.
For interactive terminal applications, set use_pty=True to allocate
a pseudo-terminal. In PTY mode, stdout and stderr are merged.
"""
auto_start = Property(False, hint="Start process on tree entry")
command = Property("", hint="Command to execute")
def __init__(self, command: str = "", *, use_pty: bool = False, env: dict[str, str] | None = None, **kwargs):
super().__init__(**kwargs)
self.command = command
self.use_pty = use_pty
self.env = env
self.stdout_data = Signal()
self.stderr_data = Signal()
self.process_exited = Signal()
self._proc: subprocess.Popen | None = None
self._sel: selectors.DefaultSelector | None = None
self._exit_code: int | None = None
self._master_fd: int | None = None
[docs]
def ready(self):
if self.auto_start and self.command:
self.start()
[docs]
def start(self, command: str | None = None):
"""Launch the subprocess. Replaces any already-running process."""
if command is not None:
self.command = command
if not self.command:
log.error("ProcessNode.start() called with empty command")
return
if self._proc is not None:
self.stop()
args = shlex.split(self.command) if isinstance(self.command, str) else self.command
self._exit_code = None
if self.use_pty:
self._start_pty(args)
else:
self._start_pipe(args)
def _start_pipe(self, args: list[str]):
"""Start subprocess with stdin/stdout/stderr pipes."""
try:
self._proc = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.env or os.environ.copy(),
)
except OSError as e:
log.error("Failed to start process %r: %s", self.command, e)
return
self._sel = selectors.DefaultSelector()
for stream, name in ((self._proc.stdout, "stdout"), (self._proc.stderr, "stderr")):
os.set_blocking(stream.fileno(), False)
self._sel.register(stream, selectors.EVENT_READ, name)
def _start_pty(self, args: list[str]):
"""Start subprocess with a pseudo-terminal."""
master, slave = pty.openpty()
env = (self.env or os.environ).copy()
env["TERM"] = "xterm-256color"
try:
self._proc = subprocess.Popen(
args,
stdin=slave,
stdout=slave,
stderr=slave,
close_fds=True,
preexec_fn=os.setsid,
env=env,
)
except OSError as e:
os.close(master)
os.close(slave)
log.error("Failed to start PTY process %r: %s", self.command, e)
return
os.close(slave)
self._master_fd = master
os.set_blocking(master, False)
self._sel = selectors.DefaultSelector()
self._sel.register(master, selectors.EVENT_READ, "pty")
[docs]
def resize(self, cols: int, rows: int):
"""Set the PTY terminal size. Only works in PTY mode."""
if self._master_fd is not None:
winsize = struct.pack("HHHH", rows, cols, 0, 0)
fcntl.ioctl(self._master_fd, termios.TIOCSWINSZ, winsize)
# Send SIGWINCH to the process group
if self._proc and self._proc.pid:
try:
os.killpg(os.getpgid(self._proc.pid), 28) # SIGWINCH
except (OSError, ProcessLookupError):
pass
[docs]
def stop(self):
"""Terminate the subprocess, escalating to kill if needed."""
if self._proc is None:
return
try:
self._proc.terminate()
try:
self._proc.wait(timeout=0.5)
except subprocess.TimeoutExpired:
self._proc.kill()
self._proc.wait(timeout=1.0)
except OSError as e:
log.warning("Error stopping process: %s", e)
self._exit_code = self._proc.returncode
self._cleanup()
[docs]
def write(self, data: str):
"""Write data to the subprocess stdin (or PTY master)."""
raw = data.encode() if isinstance(data, str) else data
if self._master_fd is not None:
try:
os.write(self._master_fd, raw)
except OSError as e:
log.warning("PTY write failed: %s", e)
return
if self._proc is None or self._proc.stdin is None:
return
try:
self._proc.stdin.write(raw)
self._proc.stdin.flush()
except (BrokenPipeError, OSError) as e:
log.warning("ProcessNode write failed: %s", e)
[docs]
def process(self, dt: float):
if self._proc is None or self._sel is None:
return
# Read any available output
for key, _ in self._sel.select(timeout=0):
try:
if key.data == "pty":
chunk = os.read(self._master_fd, 65536)
else:
chunk = key.fileobj.read(65536)
except OSError:
chunk = None
if chunk:
text = chunk.decode(errors="replace")
if key.data in ("stdout", "pty"):
self.stdout_data(text)
else:
self.stderr_data(text)
# Check for process exit
if self._proc.poll() is not None:
self._drain()
self._exit_code = self._proc.returncode
self._cleanup()
self.process_exited(self._exit_code)
def _drain(self):
"""Read any remaining buffered output after process exits."""
if self._master_fd is not None:
try:
while True:
chunk = os.read(self._master_fd, 65536)
if not chunk:
break
self.stdout_data(chunk.decode(errors="replace"))
except OSError:
pass
return
if self._proc is None:
return
for stream, name in ((self._proc.stdout, "stdout"), (self._proc.stderr, "stderr")):
if stream is None:
continue
try:
remaining = stream.read()
except OSError:
remaining = None
if remaining:
text = remaining.decode(errors="replace")
if name == "stdout":
self.stdout_data(text)
else:
self.stderr_data(text)
def _cleanup(self):
"""Close selector, master fd, and release process handle."""
if self._sel is not None:
self._sel.close()
self._sel = None
if self._master_fd is not None:
try:
os.close(self._master_fd)
except OSError:
pass
self._master_fd = None
if self._proc is not None:
for stream in (self._proc.stdin, self._proc.stdout, self._proc.stderr):
if stream:
try:
stream.close()
except OSError:
pass
self._proc = None
[docs]
def exit_tree(self):
if self._proc is not None:
self.stop()
@property
def running(self) -> bool:
return self._proc is not None and self._proc.poll() is None
@property
def exit_code(self) -> int | None:
if self._proc is not None:
return self._proc.returncode
return self._exit_code