Source code for simvx.ide.dap.manager

"""Debug session and breakpoint manager -- coordinates DAPClient with IDE state."""


from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

from simvx.core import Signal

from .client import DAPClient

if TYPE_CHECKING:
    from ..config import IDEConfig
    from ..state import IDEState

log = logging.getLogger(__name__)


[docs] class DebugManager: """High-level debug session manager. Creates/destroys DAPClient instances per session, syncs breakpoints from IDEState, and caches thread/stack/scope/variable data for the UI. """ def __init__(self, state: IDEState, config: IDEConfig): self._state = state self._config = config self._client: DAPClient | None = None self._debug_state: str = "idle" # idle | running | stopped self._current_thread_id: int = 0 self._current_frame_id: int = 0 self._current_file: str = "" self._current_line: int = 0 self._launch_path: str = "" # Breakpoint conditions: path -> {line: condition_expr} self._conditions: dict[str, dict[int, str]] = {} # Signal forwarded from DAPClient when debugpy requests runInTerminal self.on_run_in_terminal = Signal() # Cached data for UI self._threads: list[dict[str, Any]] = [] self._stack_frames: list[dict[str, Any]] = [] self._scopes: list[dict[str, Any]] = [] self._variables: dict[int, list[dict[str, Any]]] = {} # Connect to state signals self._state.debug_state_changed.connect(self._on_debug_state_changed) self._state.breakpoint_toggled.connect(self._on_breakpoint_toggled) # -- Public API ------------------------------------------------------------
[docs] def start_debug(self, path: str): """Start debugging a Python file.""" if self._client and self._client.running: self.stop_debug() self._clear_cache() env = self._config.get_env(self._state.project_root) python = self._config.get_python_command(self._state.project_root) self._client = DAPClient(self._state, env=env) self._client.on_initialized.connect(self._after_initialize) self._client.on_stopped.connect(self._on_stopped_event) self._client.on_terminated.connect(self._on_terminated) self._client.on_run_in_terminal.connect(lambda cmd, env: self.on_run_in_terminal.emit(cmd, env)) self._client.start(python=python) self._launch_path = path
[docs] def stop_debug(self): """Stop the current debug session.""" if self._client: self._client.stop() self._client = None self._debug_state = "idle" self._clear_cache()
[docs] def continue_execution(self): if self._client and self._debug_state == "stopped": self._client.continue_execution(self._current_thread_id)
[docs] def step_over(self): if self._client and self._debug_state == "stopped": self._client.step_over(self._current_thread_id)
[docs] def step_into(self): if self._client and self._debug_state == "stopped": self._client.step_into(self._current_thread_id)
[docs] def step_out(self): if self._client and self._debug_state == "stopped": self._client.step_out(self._current_thread_id)
[docs] def pause(self): if self._client and self._debug_state == "running": self._client.pause(self._current_thread_id)
[docs] def toggle_breakpoint(self, path: str, line: int): self._state.toggle_breakpoint(path, line)
[docs] def evaluate(self, expr: str, callback=None): """Evaluate expression in the current frame context. If *callback* is provided, the parsed result string (or dict on error) is forwarded to it instead of being emitted to debug_output. """ if self._client and self._debug_state == "stopped" and self._current_frame_id: if callback: def _wrap(msg, cb=callback): body = msg.get("body", {}) if msg.get("success"): cb(body.get("result", str(body))) else: cb({"error": msg.get("message", "eval failed"), "result": msg.get("message", "")}) self._client.evaluate(expr, self._current_frame_id, _wrap) else: self._client.evaluate(expr, self._current_frame_id, self._on_evaluate_response)
[docs] def set_breakpoint_condition(self, path: str, line: int, condition: str): """Set or clear a condition on a breakpoint.""" if condition: self._conditions.setdefault(path, {})[line] = condition else: conds = self._conditions.get(path, {}) conds.pop(line, None) if not conds: self._conditions.pop(path, None) # Re-sync breakpoints with adapter if live if self._client and self._client.running: lines = sorted(self._state.get_breakpoints(path)) self._set_breakpoints_with_conditions(path, lines)
[docs] def get_breakpoint_condition(self, path: str, line: int) -> str: """Get the condition for a breakpoint, or empty string.""" return self._conditions.get(path, {}).get(line, "")
[docs] def get_conditions_for_file(self, path: str) -> dict[int, str]: """Get all conditions for a file as {line: condition}.""" return dict(self._conditions.get(path, {}))
def _set_breakpoints_with_conditions(self, path: str, lines: list[int]): """Send setBreakpoints with optional conditions from our local store.""" conds = self._conditions.get(path, {}) has_conditions = any(conds.get(ln) for ln in lines) if not has_conditions: # Fast path: no conditions, use standard API self._client.set_breakpoints(path, lines) return # Build breakpoint list with conditions and send directly bp_list = [] for ln in lines: bp: dict[str, Any] = {"line": ln} cond = conds.get(ln, "") if cond: bp["condition"] = cond bp_list.append(bp) self._client._send_request( "setBreakpoints", { "source": {"path": path}, "breakpoints": bp_list, "sourceModified": False, }, self._client._on_set_breakpoints_response, )
[docs] def select_frame(self, frame_index: int): """Select a stack frame by index, refreshing scopes and variables.""" if frame_index < 0 or frame_index >= len(self._stack_frames): return frame = self._stack_frames[frame_index] self._current_frame_id = frame.get("id", 0) source = frame.get("source", {}) self._current_file = source.get("path", "") self._current_line = frame.get("line", 0) if self._current_file and self._current_line: self._state.goto_requested.emit(self._current_file, self._current_line, 0) self._fetch_scopes()
[docs] def fetch_variables(self, variables_ref: int, callback=None): """Fetch variables for a scope or nested object.""" if self._client: def on_vars(resp): body = resp.get("body", {}) var_list = body.get("variables", []) self._variables[variables_ref] = var_list if callback: callback(var_list) self._client.get_variables(variables_ref, on_vars)
# -- Properties ------------------------------------------------------------ @property def is_debugging(self) -> bool: return self._debug_state != "idle" @property def debug_state(self) -> str: return self._debug_state @property def current_file(self) -> str: return self._current_file @property def current_line(self) -> int: return self._current_line @property def threads(self) -> list[dict[str, Any]]: return self._threads @property def stack_frames(self) -> list[dict[str, Any]]: return self._stack_frames @property def scopes(self) -> list[dict[str, Any]]: return self._scopes @property def variables(self) -> dict[int, list[dict[str, Any]]]: return self._variables # -- Internal: launch sequence --------------------------------------------- def _after_initialize(self): """Called after initialize response -- set breakpoints, launch, configurationDone.""" # Sync all breakpoints (with conditions if any) all_bp = self._state.get_all_breakpoints() for path, lines in all_bp.items(): if lines: self._set_breakpoints_with_conditions(path, sorted(lines)) # Launch target self._client.launch(self._launch_path) # Signal configuration complete self._client.configuration_done() # -- Internal: event handlers ---------------------------------------------- def _on_debug_state_changed(self, state_name: str, data: dict): self._debug_state = state_name if state_name == "stopped": thread_id = data.get("thread_id", 0) if thread_id: self._current_thread_id = thread_id self._fetch_threads() self._fetch_stack_trace() def _on_stopped_event(self, body: dict): reason = body.get("reason", "") if reason == "exception": pass # Exception navigation handled by DAPClient._navigate_to_exception def _on_terminated(self): self._debug_state = "idle" self._clear_cache() def _on_breakpoint_toggled(self, path: str, line: int): """Re-sync breakpoints for the file when toggled during a live session.""" if not self._client or not self._client.running: return lines = sorted(self._state.get_breakpoints(path)) self._set_breakpoints_with_conditions(path, lines) def _on_evaluate_response(self, msg: dict): body = msg.get("body", {}) result = body.get("result", "") if result: self._state.debug_output.emit(result + "\n", "console") # -- Internal: data fetching ----------------------------------------------- def _fetch_threads(self): if not self._client: return self._client.get_threads(self._on_threads_response) def _on_threads_response(self, msg: dict): body = msg.get("body", {}) self._threads = body.get("threads", []) if self._threads and not self._current_thread_id: self._current_thread_id = self._threads[0].get("id", 0) def _fetch_stack_trace(self): if not self._client or not self._current_thread_id: return self._client.get_stack_trace(self._current_thread_id, self._on_stack_trace_response) def _on_stack_trace_response(self, msg: dict): body = msg.get("body", {}) self._stack_frames = body.get("stackFrames", []) if self._stack_frames: top = self._stack_frames[0] self._current_frame_id = top.get("id", 0) source = top.get("source", {}) self._current_file = source.get("path", "") self._current_line = top.get("line", 0) if self._current_file and self._current_line: self._state.goto_requested.emit(self._current_file, self._current_line, 0) self._fetch_scopes() def _fetch_scopes(self): if not self._client or not self._current_frame_id: return self._client.get_scopes(self._current_frame_id, self._on_scopes_response) def _on_scopes_response(self, msg: dict): body = msg.get("body", {}) self._scopes = body.get("scopes", []) self._variables.clear() for scope in self._scopes: ref = scope.get("variablesReference", 0) if ref: self.fetch_variables(ref) # -- Cache ----------------------------------------------------------------- def _clear_cache(self): self._threads.clear() self._stack_frames.clear() self._scopes.clear() self._variables.clear() self._current_thread_id = 0 self._current_frame_id = 0 self._current_file = "" self._current_line = 0
[docs] def process(self, dt: float): """Tick the DAP client to poll subprocess I/O.""" if self._client: self._client.process(dt)