Source code for simvx.ide.dap.client

"""DAP JSON-RPC client -- communicates with debugpy over Content-Length framing."""


from __future__ import annotations

import json
import logging
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

from simvx.core import Signal
from simvx.core.process_node import ProcessNode

if TYPE_CHECKING:
    from ..state import IDEState

log = logging.getLogger(__name__)


[docs] class DAPClient: """Debug Adapter Protocol client using ProcessNode for subprocess I/O. Launches debugpy's adapter process and exchanges DAP messages over stdin/stdout with Content-Length framing (identical to LSP). """ def __init__(self, state: IDEState, env: dict[str, str] | None = None): self._state = state self._proc = ProcessNode(env=env) self._seq = 1 self._pending: dict[int, Callable] = {} self._buffer = b"" self._initialized = False self._configured = False self._capabilities: dict[str, Any] = {} self.on_initialized = Signal() self.on_stopped = Signal() self.on_continued = Signal() self.on_terminated = Signal() self.on_output = Signal() self.on_thread = Signal() self.on_breakpoint = Signal() self.on_run_in_terminal = Signal() self._proc.stdout_data.connect(self._on_stdout) self._proc.stderr_data.connect(self._on_stderr) self._proc.process_exited.connect(self._on_exit) # -- Lifecycle -------------------------------------------------------------
[docs] def start(self, python: str = "python"): """Launch debugpy adapter and send initialize request.""" self._proc.start(f"{python} -m debugpy.adapter") self._send_request( "initialize", { "clientID": "simvx-ide", "clientName": "SimVX IDE", "adapterID": "debugpy", "pathFormat": "path", "linesStartAt1": True, "columnsStartAt1": True, "supportsVariableType": True, "supportsVariablePaging": False, "supportsRunInTerminalRequest": True, "locale": "en-us", }, self._on_initialize_response, )
[docs] def stop(self): """Terminate the debug session and adapter process.""" if self._proc.running: self._send_request("disconnect", {"terminateDebuggee": True}) self._proc.stop() self._reset()
def _reset(self): self._seq = 1 self._pending.clear() self._buffer = b"" self._initialized = False self._configured = False self._capabilities.clear() # -- Debug commands --------------------------------------------------------
[docs] def launch(self, program: str, args: list[str] | None = None, cwd: str | None = None): """Launch the target program under the debugger.""" arguments: dict[str, Any] = { "type": "python", "request": "launch", "program": program, "console": "internalConsole", "stopOnEntry": False, "justMyCode": True, } if args: arguments["args"] = args if cwd: arguments["cwd"] = cwd self._send_request("launch", arguments, self._on_launch_response)
[docs] def set_breakpoints(self, path: str, lines: list[int]): """Set breakpoints for a file (replaces all previous breakpoints in that file).""" self._send_request( "setBreakpoints", { "source": {"path": path}, "breakpoints": [{"line": ln} for ln in lines], "sourceModified": False, }, self._on_set_breakpoints_response, )
[docs] def continue_execution(self, thread_id: int = 0): self._send_request("continue", {"threadId": thread_id})
[docs] def step_over(self, thread_id: int = 0): self._send_request("next", {"threadId": thread_id, "granularity": "line"})
[docs] def step_into(self, thread_id: int = 0): self._send_request("stepIn", {"threadId": thread_id, "granularity": "line"})
[docs] def step_out(self, thread_id: int = 0): self._send_request("stepOut", {"threadId": thread_id})
[docs] def pause(self, thread_id: int = 0): self._send_request("pause", {"threadId": thread_id})
[docs] def get_threads(self, callback: Callable | None = None): self._send_request("threads", {}, callback)
[docs] def get_stack_trace(self, thread_id: int, callback: Callable | None = None): self._send_request( "stackTrace", { "threadId": thread_id, "startFrame": 0, "levels": 100, }, callback, )
[docs] def get_scopes(self, frame_id: int, callback: Callable | None = None): self._send_request("scopes", {"frameId": frame_id}, callback)
[docs] def get_variables(self, variables_ref: int, callback: Callable | None = None): self._send_request("variables", {"variablesReference": variables_ref}, callback)
[docs] def evaluate(self, expression: str, frame_id: int, callback: Callable | None = None): self._send_request( "evaluate", { "expression": expression, "frameId": frame_id, "context": "repl", }, callback, )
[docs] def configuration_done(self): """Signal that all initial configuration (breakpoints etc.) has been sent.""" self._send_request("configurationDone", {}, self._on_configuration_done_response)
# -- Message framing ------------------------------------------------------- def _send_request(self, command: str, arguments: dict, callback: Callable | None = None): seq = self._seq self._seq += 1 msg = { "seq": seq, "type": "request", "command": command, "arguments": arguments, } if callback: self._pending[seq] = callback self._send_raw(msg) def _send_raw(self, msg: dict): body = json.dumps(msg, separators=(",", ":")).encode("utf-8") header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii") self._proc.write(header + body) # -- I/O handling ---------------------------------------------------------- def _on_stdout(self, text: str): self._buffer += text.encode("utf-8", errors="replace") self._try_parse() def _on_stderr(self, text: str): log.debug("DAP stderr: %s", text.strip()) def _on_exit(self, code: int): log.info("DAP adapter exited with code %d", code) self._state.debug_stopped.emit() def _try_parse(self): """Parse complete messages from the buffer using Content-Length framing.""" while True: header_end = self._buffer.find(b"\r\n\r\n") if header_end < 0: break header_block = self._buffer[:header_end].decode("ascii", errors="replace") content_length = 0 for line in header_block.split("\r\n"): if line.lower().startswith("content-length:"): content_length = int(line.split(":", 1)[1].strip()) body_start = header_end + 4 if len(self._buffer) < body_start + content_length: break body = self._buffer[body_start : body_start + content_length] self._buffer = self._buffer[body_start + content_length :] try: msg = json.loads(body) except json.JSONDecodeError: log.error("DAP: malformed JSON: %s", body[:200]) continue self._dispatch(msg) def _dispatch(self, msg: dict): msg_type = msg.get("type") if msg_type == "response": self._handle_response(msg) elif msg_type == "event": self._handle_event(msg) elif msg_type == "request": self._handle_reverse_request(msg) # -- Response handling ----------------------------------------------------- def _handle_response(self, msg: dict): req_seq = msg.get("request_seq", 0) success = msg.get("success", False) command = msg.get("command", "") if not success: error_msg = msg.get("message", "unknown error") log.error("DAP %s failed: %s", command, error_msg) callback = self._pending.pop(req_seq, None) if callback: callback(msg) def _handle_reverse_request(self, msg: dict): """Handle requests from the adapter (e.g. runInTerminal).""" command = msg.get("command", "") args = msg.get("arguments", {}) if command == "runInTerminal": self._handle_run_in_terminal(msg, args) else: log.debug("DAP reverse request: %s (unsupported)", command) self._send_raw( { "seq": self._seq, "type": "response", "request_seq": msg.get("seq"), "success": False, "command": command, "message": f"reverse request '{command}' not supported", } ) self._seq += 1 def _handle_run_in_terminal(self, msg: dict, args: dict): """Handle runInTerminal -- launch command in the IDE terminal.""" cmd_args = args.get("args", []) cwd = args.get("cwd", "") env = args.get("env") title = args.get("title", "Debug") if not cmd_args: self._send_raw( { "seq": self._seq, "type": "response", "request_seq": msg.get("seq"), "success": False, "command": "runInTerminal", "message": "no args provided", } ) self._seq += 1 return import shlex cmd_str = " ".join(shlex.quote(a) for a in cmd_args) if cwd: cmd_str = f"cd {shlex.quote(cwd)} && {cmd_str}" self.on_run_in_terminal.emit(cmd_str, env or {}) log.info("DAP runInTerminal: %s", cmd_str) self._send_raw( { "seq": self._seq, "type": "response", "request_seq": msg.get("seq"), "success": True, "command": "runInTerminal", "body": {}, } ) self._seq += 1 # -- Event handling -------------------------------------------------------- def _handle_event(self, msg: dict): event = msg.get("event", "") body = msg.get("body", {}) if event == "initialized": self._initialized = True elif event == "stopped": self._handle_stopped(body) elif event == "continued": self._state.debug_state_changed.emit("running", {}) self.on_continued.emit() elif event == "terminated": self._state.debug_stopped.emit() self.on_terminated.emit() elif event == "exited": log.info("Debuggee exited with code %d", body.get("exitCode", -1)) elif event == "output": category = body.get("category", "console") text = body.get("output", "") self._state.debug_output.emit(text, category) self.on_output.emit(text, category) elif event == "thread": self.on_thread.emit(body) elif event == "breakpoint": self.on_breakpoint.emit(body) def _handle_stopped(self, body: dict): reason = body.get("reason", "") thread_id = body.get("threadId", 0) text = body.get("text", "") description = body.get("description", "") data = {"reason": reason, "thread_id": thread_id} self._state.debug_state_changed.emit("stopped", data) self.on_stopped.emit(body) if reason == "exception": # Fetch stack to extract file/line for navigation self.get_stack_trace(thread_id, lambda resp: self._navigate_to_exception(resp, text or description)) def _navigate_to_exception(self, response: dict, message: str): body = response.get("body", {}) frames = body.get("stackFrames", []) if not frames: return top = frames[0] source = top.get("source", {}) path = source.get("path", "") line = top.get("line", 0) if path and line: self._state.exception_occurred.emit(path, line, message) self._state.goto_requested.emit(path, line, 0) # -- Initialize/launch callbacks ------------------------------------------- def _on_initialize_response(self, msg: dict): if msg.get("success"): self._capabilities = msg.get("body", {}) log.info("DAP initialized, capabilities: %s", list(self._capabilities)) self.on_initialized.emit() def _on_launch_response(self, msg: dict): if msg.get("success"): self._state.debug_started.emit() self._state.debug_state_changed.emit("running", {}) def _on_set_breakpoints_response(self, msg: dict): body = msg.get("body", {}) breakpoints = body.get("breakpoints", []) for bp in breakpoints: if not bp.get("verified", False): log.debug("Breakpoint not verified: line %d", bp.get("line", 0)) def _on_configuration_done_response(self, msg: dict): if msg.get("success"): self._configured = True # -- Properties ------------------------------------------------------------ @property def running(self) -> bool: return self._proc.running @property def initialized(self) -> bool: return self._initialized @property def configured(self) -> bool: return self._configured
[docs] def process(self, dt: float): """Tick the underlying ProcessNode to poll I/O.""" self._proc.process(dt)