"""Debug session and breakpoint manager -- coordinates DAPClient with IDE state."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
from simvx.core import Signal
from .client import DAPClient
if TYPE_CHECKING:
from ..config import IDEConfig
from ..state import IDEState
log = logging.getLogger(__name__)
[docs]
class DebugManager:
"""High-level debug session manager.
Creates/destroys DAPClient instances per session, syncs breakpoints
from IDEState, and caches thread/stack/scope/variable data for the UI.
"""
def __init__(self, state: IDEState, config: IDEConfig):
self._state = state
self._config = config
self._client: DAPClient | None = None
self._debug_state: str = "idle" # idle | running | stopped
self._current_thread_id: int = 0
self._current_frame_id: int = 0
self._current_file: str = ""
self._current_line: int = 0
self._launch_path: str = ""
# Breakpoint conditions: path -> {line: condition_expr}
self._conditions: dict[str, dict[int, str]] = {}
# Signal forwarded from DAPClient when debugpy requests runInTerminal
self.on_run_in_terminal = Signal()
# Cached data for UI
self._threads: list[dict[str, Any]] = []
self._stack_frames: list[dict[str, Any]] = []
self._scopes: list[dict[str, Any]] = []
self._variables: dict[int, list[dict[str, Any]]] = {}
# Connect to state signals
self._state.debug_state_changed.connect(self._on_debug_state_changed)
self._state.breakpoint_toggled.connect(self._on_breakpoint_toggled)
# -- Public API ------------------------------------------------------------
[docs]
def start_debug(self, path: str):
"""Start debugging a Python file."""
if self._client and self._client.running:
self.stop_debug()
self._clear_cache()
env = self._config.get_env(self._state.project_root)
python = self._config.get_python_command(self._state.project_root)
self._client = DAPClient(self._state, env=env)
self._client.on_initialized.connect(self._after_initialize)
self._client.on_stopped.connect(self._on_stopped_event)
self._client.on_terminated.connect(self._on_terminated)
self._client.on_run_in_terminal.connect(lambda cmd, env: self.on_run_in_terminal.emit(cmd, env))
self._client.start(python=python)
self._launch_path = path
[docs]
def stop_debug(self):
"""Stop the current debug session."""
if self._client:
self._client.stop()
self._client = None
self._debug_state = "idle"
self._clear_cache()
[docs]
def continue_execution(self):
if self._client and self._debug_state == "stopped":
self._client.continue_execution(self._current_thread_id)
[docs]
def step_over(self):
if self._client and self._debug_state == "stopped":
self._client.step_over(self._current_thread_id)
[docs]
def step_into(self):
if self._client and self._debug_state == "stopped":
self._client.step_into(self._current_thread_id)
[docs]
def step_out(self):
if self._client and self._debug_state == "stopped":
self._client.step_out(self._current_thread_id)
[docs]
def pause(self):
if self._client and self._debug_state == "running":
self._client.pause(self._current_thread_id)
[docs]
def toggle_breakpoint(self, path: str, line: int):
self._state.toggle_breakpoint(path, line)
[docs]
def evaluate(self, expr: str, callback=None):
"""Evaluate expression in the current frame context.
If *callback* is provided, the parsed result string (or dict on error)
is forwarded to it instead of being emitted to debug_output.
"""
if self._client and self._debug_state == "stopped" and self._current_frame_id:
if callback:
def _wrap(msg, cb=callback):
body = msg.get("body", {})
if msg.get("success"):
cb(body.get("result", str(body)))
else:
cb({"error": msg.get("message", "eval failed"), "result": msg.get("message", "")})
self._client.evaluate(expr, self._current_frame_id, _wrap)
else:
self._client.evaluate(expr, self._current_frame_id, self._on_evaluate_response)
[docs]
def set_breakpoint_condition(self, path: str, line: int, condition: str):
"""Set or clear a condition on a breakpoint."""
if condition:
self._conditions.setdefault(path, {})[line] = condition
else:
conds = self._conditions.get(path, {})
conds.pop(line, None)
if not conds:
self._conditions.pop(path, None)
# Re-sync breakpoints with adapter if live
if self._client and self._client.running:
lines = sorted(self._state.get_breakpoints(path))
self._set_breakpoints_with_conditions(path, lines)
[docs]
def get_breakpoint_condition(self, path: str, line: int) -> str:
"""Get the condition for a breakpoint, or empty string."""
return self._conditions.get(path, {}).get(line, "")
[docs]
def get_conditions_for_file(self, path: str) -> dict[int, str]:
"""Get all conditions for a file as {line: condition}."""
return dict(self._conditions.get(path, {}))
def _set_breakpoints_with_conditions(self, path: str, lines: list[int]):
"""Send setBreakpoints with optional conditions from our local store."""
conds = self._conditions.get(path, {})
has_conditions = any(conds.get(ln) for ln in lines)
if not has_conditions:
# Fast path: no conditions, use standard API
self._client.set_breakpoints(path, lines)
return
# Build breakpoint list with conditions and send directly
bp_list = []
for ln in lines:
bp: dict[str, Any] = {"line": ln}
cond = conds.get(ln, "")
if cond:
bp["condition"] = cond
bp_list.append(bp)
self._client._send_request(
"setBreakpoints",
{
"source": {"path": path},
"breakpoints": bp_list,
"sourceModified": False,
},
self._client._on_set_breakpoints_response,
)
[docs]
def select_frame(self, frame_index: int):
"""Select a stack frame by index, refreshing scopes and variables."""
if frame_index < 0 or frame_index >= len(self._stack_frames):
return
frame = self._stack_frames[frame_index]
self._current_frame_id = frame.get("id", 0)
source = frame.get("source", {})
self._current_file = source.get("path", "")
self._current_line = frame.get("line", 0)
if self._current_file and self._current_line:
self._state.goto_requested.emit(self._current_file, self._current_line, 0)
self._fetch_scopes()
[docs]
def fetch_variables(self, variables_ref: int, callback=None):
"""Fetch variables for a scope or nested object."""
if self._client:
def on_vars(resp):
body = resp.get("body", {})
var_list = body.get("variables", [])
self._variables[variables_ref] = var_list
if callback:
callback(var_list)
self._client.get_variables(variables_ref, on_vars)
# -- Properties ------------------------------------------------------------
@property
def is_debugging(self) -> bool:
return self._debug_state != "idle"
@property
def debug_state(self) -> str:
return self._debug_state
@property
def current_file(self) -> str:
return self._current_file
@property
def current_line(self) -> int:
return self._current_line
@property
def threads(self) -> list[dict[str, Any]]:
return self._threads
@property
def stack_frames(self) -> list[dict[str, Any]]:
return self._stack_frames
@property
def scopes(self) -> list[dict[str, Any]]:
return self._scopes
@property
def variables(self) -> dict[int, list[dict[str, Any]]]:
return self._variables
# -- Internal: launch sequence ---------------------------------------------
def _after_initialize(self):
"""Called after initialize response -- set breakpoints, launch, configurationDone."""
# Sync all breakpoints (with conditions if any)
all_bp = self._state.get_all_breakpoints()
for path, lines in all_bp.items():
if lines:
self._set_breakpoints_with_conditions(path, sorted(lines))
# Launch target
self._client.launch(self._launch_path)
# Signal configuration complete
self._client.configuration_done()
# -- Internal: event handlers ----------------------------------------------
def _on_debug_state_changed(self, state_name: str, data: dict):
self._debug_state = state_name
if state_name == "stopped":
thread_id = data.get("thread_id", 0)
if thread_id:
self._current_thread_id = thread_id
self._fetch_threads()
self._fetch_stack_trace()
def _on_stopped_event(self, body: dict):
reason = body.get("reason", "")
if reason == "exception":
pass # Exception navigation handled by DAPClient._navigate_to_exception
def _on_terminated(self):
self._debug_state = "idle"
self._clear_cache()
def _on_breakpoint_toggled(self, path: str, line: int):
"""Re-sync breakpoints for the file when toggled during a live session."""
if not self._client or not self._client.running:
return
lines = sorted(self._state.get_breakpoints(path))
self._set_breakpoints_with_conditions(path, lines)
def _on_evaluate_response(self, msg: dict):
body = msg.get("body", {})
result = body.get("result", "")
if result:
self._state.debug_output.emit(result + "\n", "console")
# -- Internal: data fetching -----------------------------------------------
def _fetch_threads(self):
if not self._client:
return
self._client.get_threads(self._on_threads_response)
def _on_threads_response(self, msg: dict):
body = msg.get("body", {})
self._threads = body.get("threads", [])
if self._threads and not self._current_thread_id:
self._current_thread_id = self._threads[0].get("id", 0)
def _fetch_stack_trace(self):
if not self._client or not self._current_thread_id:
return
self._client.get_stack_trace(self._current_thread_id, self._on_stack_trace_response)
def _on_stack_trace_response(self, msg: dict):
body = msg.get("body", {})
self._stack_frames = body.get("stackFrames", [])
if self._stack_frames:
top = self._stack_frames[0]
self._current_frame_id = top.get("id", 0)
source = top.get("source", {})
self._current_file = source.get("path", "")
self._current_line = top.get("line", 0)
if self._current_file and self._current_line:
self._state.goto_requested.emit(self._current_file, self._current_line, 0)
self._fetch_scopes()
def _fetch_scopes(self):
if not self._client or not self._current_frame_id:
return
self._client.get_scopes(self._current_frame_id, self._on_scopes_response)
def _on_scopes_response(self, msg: dict):
body = msg.get("body", {})
self._scopes = body.get("scopes", [])
self._variables.clear()
for scope in self._scopes:
ref = scope.get("variablesReference", 0)
if ref:
self.fetch_variables(ref)
# -- Cache -----------------------------------------------------------------
def _clear_cache(self):
self._threads.clear()
self._stack_frames.clear()
self._scopes.clear()
self._variables.clear()
self._current_thread_id = 0
self._current_frame_id = 0
self._current_file = ""
self._current_line = 0
[docs]
def process(self, dt: float):
"""Tick the DAP client to poll subprocess I/O."""
if self._client:
self._client.process(dt)