"""DAP JSON-RPC client -- communicates with debugpy over Content-Length framing."""
from __future__ import annotations
import json
import logging
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from simvx.core import Signal
from simvx.core.process_node import ProcessNode
if TYPE_CHECKING:
from ..state import IDEState
log = logging.getLogger(__name__)
[docs]
class DAPClient:
"""Debug Adapter Protocol client using ProcessNode for subprocess I/O.
Launches debugpy's adapter process and exchanges DAP messages over
stdin/stdout with Content-Length framing (identical to LSP).
"""
def __init__(self, state: IDEState, env: dict[str, str] | None = None):
self._state = state
self._proc = ProcessNode(env=env)
self._seq = 1
self._pending: dict[int, Callable] = {}
self._buffer = b""
self._initialized = False
self._configured = False
self._capabilities: dict[str, Any] = {}
self.on_initialized = Signal()
self.on_stopped = Signal()
self.on_continued = Signal()
self.on_terminated = Signal()
self.on_output = Signal()
self.on_thread = Signal()
self.on_breakpoint = Signal()
self.on_run_in_terminal = Signal()
self._proc.stdout_data.connect(self._on_stdout)
self._proc.stderr_data.connect(self._on_stderr)
self._proc.process_exited.connect(self._on_exit)
# -- Lifecycle -------------------------------------------------------------
[docs]
def start(self, python: str = "python"):
"""Launch debugpy adapter and send initialize request."""
self._proc.start(f"{python} -m debugpy.adapter")
self._send_request(
"initialize",
{
"clientID": "simvx-ide",
"clientName": "SimVX IDE",
"adapterID": "debugpy",
"pathFormat": "path",
"linesStartAt1": True,
"columnsStartAt1": True,
"supportsVariableType": True,
"supportsVariablePaging": False,
"supportsRunInTerminalRequest": True,
"locale": "en-us",
},
self._on_initialize_response,
)
[docs]
def stop(self):
"""Terminate the debug session and adapter process."""
if self._proc.running:
self._send_request("disconnect", {"terminateDebuggee": True})
self._proc.stop()
self._reset()
def _reset(self):
self._seq = 1
self._pending.clear()
self._buffer = b""
self._initialized = False
self._configured = False
self._capabilities.clear()
# -- Debug commands --------------------------------------------------------
[docs]
def launch(self, program: str, args: list[str] | None = None, cwd: str | None = None):
"""Launch the target program under the debugger."""
arguments: dict[str, Any] = {
"type": "python",
"request": "launch",
"program": program,
"console": "internalConsole",
"stopOnEntry": False,
"justMyCode": True,
}
if args:
arguments["args"] = args
if cwd:
arguments["cwd"] = cwd
self._send_request("launch", arguments, self._on_launch_response)
[docs]
def set_breakpoints(self, path: str, lines: list[int]):
"""Set breakpoints for a file (replaces all previous breakpoints in that file)."""
self._send_request(
"setBreakpoints",
{
"source": {"path": path},
"breakpoints": [{"line": ln} for ln in lines],
"sourceModified": False,
},
self._on_set_breakpoints_response,
)
[docs]
def continue_execution(self, thread_id: int = 0):
self._send_request("continue", {"threadId": thread_id})
[docs]
def step_over(self, thread_id: int = 0):
self._send_request("next", {"threadId": thread_id, "granularity": "line"})
[docs]
def step_into(self, thread_id: int = 0):
self._send_request("stepIn", {"threadId": thread_id, "granularity": "line"})
[docs]
def step_out(self, thread_id: int = 0):
self._send_request("stepOut", {"threadId": thread_id})
[docs]
def pause(self, thread_id: int = 0):
self._send_request("pause", {"threadId": thread_id})
[docs]
def get_threads(self, callback: Callable | None = None):
self._send_request("threads", {}, callback)
[docs]
def get_stack_trace(self, thread_id: int, callback: Callable | None = None):
self._send_request(
"stackTrace",
{
"threadId": thread_id,
"startFrame": 0,
"levels": 100,
},
callback,
)
[docs]
def get_scopes(self, frame_id: int, callback: Callable | None = None):
self._send_request("scopes", {"frameId": frame_id}, callback)
[docs]
def get_variables(self, variables_ref: int, callback: Callable | None = None):
self._send_request("variables", {"variablesReference": variables_ref}, callback)
[docs]
def evaluate(self, expression: str, frame_id: int, callback: Callable | None = None):
self._send_request(
"evaluate",
{
"expression": expression,
"frameId": frame_id,
"context": "repl",
},
callback,
)
[docs]
def configuration_done(self):
"""Signal that all initial configuration (breakpoints etc.) has been sent."""
self._send_request("configurationDone", {}, self._on_configuration_done_response)
# -- Message framing -------------------------------------------------------
def _send_request(self, command: str, arguments: dict, callback: Callable | None = None):
seq = self._seq
self._seq += 1
msg = {
"seq": seq,
"type": "request",
"command": command,
"arguments": arguments,
}
if callback:
self._pending[seq] = callback
self._send_raw(msg)
def _send_raw(self, msg: dict):
body = json.dumps(msg, separators=(",", ":")).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
self._proc.write(header + body)
# -- I/O handling ----------------------------------------------------------
def _on_stdout(self, text: str):
self._buffer += text.encode("utf-8", errors="replace")
self._try_parse()
def _on_stderr(self, text: str):
log.debug("DAP stderr: %s", text.strip())
def _on_exit(self, code: int):
log.info("DAP adapter exited with code %d", code)
self._state.debug_stopped.emit()
def _try_parse(self):
"""Parse complete messages from the buffer using Content-Length framing."""
while True:
header_end = self._buffer.find(b"\r\n\r\n")
if header_end < 0:
break
header_block = self._buffer[:header_end].decode("ascii", errors="replace")
content_length = 0
for line in header_block.split("\r\n"):
if line.lower().startswith("content-length:"):
content_length = int(line.split(":", 1)[1].strip())
body_start = header_end + 4
if len(self._buffer) < body_start + content_length:
break
body = self._buffer[body_start : body_start + content_length]
self._buffer = self._buffer[body_start + content_length :]
try:
msg = json.loads(body)
except json.JSONDecodeError:
log.error("DAP: malformed JSON: %s", body[:200])
continue
self._dispatch(msg)
def _dispatch(self, msg: dict):
msg_type = msg.get("type")
if msg_type == "response":
self._handle_response(msg)
elif msg_type == "event":
self._handle_event(msg)
elif msg_type == "request":
self._handle_reverse_request(msg)
# -- Response handling -----------------------------------------------------
def _handle_response(self, msg: dict):
req_seq = msg.get("request_seq", 0)
success = msg.get("success", False)
command = msg.get("command", "")
if not success:
error_msg = msg.get("message", "unknown error")
log.error("DAP %s failed: %s", command, error_msg)
callback = self._pending.pop(req_seq, None)
if callback:
callback(msg)
def _handle_reverse_request(self, msg: dict):
"""Handle requests from the adapter (e.g. runInTerminal)."""
command = msg.get("command", "")
args = msg.get("arguments", {})
if command == "runInTerminal":
self._handle_run_in_terminal(msg, args)
else:
log.debug("DAP reverse request: %s (unsupported)", command)
self._send_raw(
{
"seq": self._seq,
"type": "response",
"request_seq": msg.get("seq"),
"success": False,
"command": command,
"message": f"reverse request '{command}' not supported",
}
)
self._seq += 1
def _handle_run_in_terminal(self, msg: dict, args: dict):
"""Handle runInTerminal -- launch command in the IDE terminal."""
cmd_args = args.get("args", [])
cwd = args.get("cwd", "")
env = args.get("env")
title = args.get("title", "Debug")
if not cmd_args:
self._send_raw(
{
"seq": self._seq,
"type": "response",
"request_seq": msg.get("seq"),
"success": False,
"command": "runInTerminal",
"message": "no args provided",
}
)
self._seq += 1
return
import shlex
cmd_str = " ".join(shlex.quote(a) for a in cmd_args)
if cwd:
cmd_str = f"cd {shlex.quote(cwd)} && {cmd_str}"
self.on_run_in_terminal.emit(cmd_str, env or {})
log.info("DAP runInTerminal: %s", cmd_str)
self._send_raw(
{
"seq": self._seq,
"type": "response",
"request_seq": msg.get("seq"),
"success": True,
"command": "runInTerminal",
"body": {},
}
)
self._seq += 1
# -- Event handling --------------------------------------------------------
def _handle_event(self, msg: dict):
event = msg.get("event", "")
body = msg.get("body", {})
if event == "initialized":
self._initialized = True
elif event == "stopped":
self._handle_stopped(body)
elif event == "continued":
self._state.debug_state_changed.emit("running", {})
self.on_continued.emit()
elif event == "terminated":
self._state.debug_stopped.emit()
self.on_terminated.emit()
elif event == "exited":
log.info("Debuggee exited with code %d", body.get("exitCode", -1))
elif event == "output":
category = body.get("category", "console")
text = body.get("output", "")
self._state.debug_output.emit(text, category)
self.on_output.emit(text, category)
elif event == "thread":
self.on_thread.emit(body)
elif event == "breakpoint":
self.on_breakpoint.emit(body)
def _handle_stopped(self, body: dict):
reason = body.get("reason", "")
thread_id = body.get("threadId", 0)
text = body.get("text", "")
description = body.get("description", "")
data = {"reason": reason, "thread_id": thread_id}
self._state.debug_state_changed.emit("stopped", data)
self.on_stopped.emit(body)
if reason == "exception":
# Fetch stack to extract file/line for navigation
self.get_stack_trace(thread_id, lambda resp: self._navigate_to_exception(resp, text or description))
def _navigate_to_exception(self, response: dict, message: str):
body = response.get("body", {})
frames = body.get("stackFrames", [])
if not frames:
return
top = frames[0]
source = top.get("source", {})
path = source.get("path", "")
line = top.get("line", 0)
if path and line:
self._state.exception_occurred.emit(path, line, message)
self._state.goto_requested.emit(path, line, 0)
# -- Initialize/launch callbacks -------------------------------------------
def _on_initialize_response(self, msg: dict):
if msg.get("success"):
self._capabilities = msg.get("body", {})
log.info("DAP initialized, capabilities: %s", list(self._capabilities))
self.on_initialized.emit()
def _on_launch_response(self, msg: dict):
if msg.get("success"):
self._state.debug_started.emit()
self._state.debug_state_changed.emit("running", {})
def _on_set_breakpoints_response(self, msg: dict):
body = msg.get("body", {})
breakpoints = body.get("breakpoints", [])
for bp in breakpoints:
if not bp.get("verified", False):
log.debug("Breakpoint not verified: line %d", bp.get("line", 0))
def _on_configuration_done_response(self, msg: dict):
if msg.get("success"):
self._configured = True
# -- Properties ------------------------------------------------------------
@property
def running(self) -> bool:
return self._proc.running
@property
def initialized(self) -> bool:
return self._initialized
@property
def configured(self) -> bool:
return self._configured
[docs]
def process(self, dt: float):
"""Tick the underlying ProcessNode to poll I/O."""
self._proc.process(dt)