Source code for simvx.ide.lsp.client

"""Full LSP client using ProcessNode for subprocess management."""


from __future__ import annotations

import json
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any

from simvx.core import Signal
from simvx.core.process_node import ProcessNode

from ..state import Diagnostic as StateDiagnostic
from ..state import IDEState
from .protocol import (
    CompletionItem,
    Diagnostic,
    Hover,
    Location,
    decode_header,
    encode_message,
    notification,
    path_to_uri,
    request,
    response,
    uri_to_path,
)

log = logging.getLogger(__name__)


[docs] class LSPClient: """Language Server Protocol client communicating via ProcessNode stdin/stdout.""" def __init__( self, state: IDEState, command: str = "pylsp", args: list[str] | None = None, env: dict[str, str] | None = None ): self.state = state self._command = command self._args = args or [] self._env = env self._proc: ProcessNode | None = None self._buf = b"" self._pending: dict[int, Callable] = {} self._initialized = False self._server_capabilities: dict = {} self._doc_versions: dict[str, int] = {} self._open_docs: set[str] = set() self._pending_notifications: list[tuple[str, dict]] = [] # queued until initialized # Signals for raw events (advanced usage) self.server_started = Signal() self.server_stopped = Signal() self.notification_received = Signal() # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------
[docs] def start(self): if self._proc is not None: self.stop() cmd = self._command if self._args: cmd += " " + " ".join(self._args) self._proc = ProcessNode(command=cmd, env=self._env) self._proc.stdout_data.connect(self._on_stdout) self._proc.stderr_data.connect(self._on_stderr) self._proc.process_exited.connect(self._on_exit) self._proc.start() self._send_initialize()
[docs] def stop(self): if self._proc is None or not self._proc.running: self._cleanup() return msg_id, msg = request("shutdown") self._send(msg) self._pending[msg_id] = self._on_shutdown_response self._initialized = False
def _on_shutdown_response(self, _result: Any): self._send(notification("exit")) if self._proc is not None: self._proc.stop() self._cleanup() def _cleanup(self): self._buf = b"" self._pending.clear() self._initialized = False self._open_docs.clear() self._doc_versions.clear() self._pending_notifications.clear() self._proc = None self.server_stopped.emit() def _guard_initialized(self) -> bool: """Return True if server is initialized and ready for requests.""" return self._initialized and self._proc is not None and self._proc.running
[docs] def poll(self): """Must be called each frame to drive ProcessNode I/O.""" if self._proc is not None and self._proc.running: self._proc.process(0.0)
# ------------------------------------------------------------------ # Document notifications # ------------------------------------------------------------------
[docs] def notify_open(self, path: str, text: str, language_id: str = "python"): if not self._guard_initialized(): self._pending_notifications.append(("open", {"path": path, "text": text, "language_id": language_id})) return uri = path_to_uri(path) self._doc_versions[uri] = 1 self._open_docs.add(uri) self._send( notification( "textDocument/didOpen", {"textDocument": {"uri": uri, "languageId": language_id, "version": 1, "text": text}}, ) )
[docs] def notify_close(self, path: str): if not self._guard_initialized(): self._pending_notifications.append(("close", {"path": path})) return uri = path_to_uri(path) self._open_docs.discard(uri) self._doc_versions.pop(uri, None) self._send(notification("textDocument/didClose", {"textDocument": {"uri": uri}}))
[docs] def notify_save(self, path: str, text: str | None = None): if not self._guard_initialized(): self._pending_notifications.append(("save", {"path": path, "text": text})) return uri = path_to_uri(path) params: dict[str, Any] = {"textDocument": {"uri": uri}} if text is not None: params["text"] = text self._send(notification("textDocument/didSave", params))
[docs] def notify_change(self, path: str, text: str, version: int | None = None): if not self._guard_initialized(): self._pending_notifications.append(("change", {"path": path, "text": text, "version": version})) return uri = path_to_uri(path) if version is not None: self._doc_versions[uri] = version else: self._doc_versions[uri] = self._doc_versions.get(uri, 0) + 1 ver = self._doc_versions[uri] self._send( notification( "textDocument/didChange", { "textDocument": {"uri": uri, "version": ver}, "contentChanges": [{"text": text}], }, ) )
# ------------------------------------------------------------------ # Requests # ------------------------------------------------------------------
[docs] def request_completion(self, path: str, line: int, col: int): if not self._guard_initialized(): return uri = path_to_uri(path) msg_id, msg = request( "textDocument/completion", { "textDocument": {"uri": uri}, "position": {"line": line, "character": col}, }, ) self._send(msg) self._pending[msg_id] = self._handle_completion
[docs] def request_definition(self, path: str, line: int, col: int): if not self._guard_initialized(): return uri = path_to_uri(path) msg_id, msg = request( "textDocument/definition", { "textDocument": {"uri": uri}, "position": {"line": line, "character": col}, }, ) self._send(msg) self._pending[msg_id] = self._handle_definition
[docs] def request_hover(self, path: str, line: int, col: int): if not self._guard_initialized(): return uri = path_to_uri(path) msg_id, msg = request( "textDocument/hover", { "textDocument": {"uri": uri}, "position": {"line": line, "character": col}, }, ) self._send(msg) self._pending[msg_id] = self._handle_hover
[docs] def request_references(self, path: str, line: int, col: int): if not self._guard_initialized(): return uri = path_to_uri(path) msg_id, msg = request( "textDocument/references", { "textDocument": {"uri": uri}, "position": {"line": line, "character": col}, "context": {"includeDeclaration": True}, }, ) self._send(msg) self._pending[msg_id] = self._handle_references
[docs] def request_rename(self, path: str, line: int, col: int, new_name: str): if not self._guard_initialized(): return uri = path_to_uri(path) msg_id, msg = request( "textDocument/rename", { "textDocument": {"uri": uri}, "position": {"line": line, "character": col}, "newName": new_name, }, ) self._send(msg) self._pending[msg_id] = self._handle_rename
[docs] def request_formatting(self, path: str, tab_size: int = 4, insert_spaces: bool = True): if not self._guard_initialized(): return uri = path_to_uri(path) msg_id, msg = request( "textDocument/formatting", { "textDocument": {"uri": uri}, "options": {"tabSize": tab_size, "insertSpaces": insert_spaces}, }, ) self._send(msg) self._pending[msg_id] = lambda result, _path=path: self._handle_formatting(result, _path)
# ------------------------------------------------------------------ # Sending # ------------------------------------------------------------------ def _send(self, msg: dict): if self._proc is None or not self._proc.running: return data = encode_message(msg) self._proc.write(data) # ------------------------------------------------------------------ # Initialize handshake # ------------------------------------------------------------------ def _send_initialize(self): root = self.state.project_root or str(Path.cwd()) msg_id, msg = request( "initialize", { "processId": None, "rootUri": path_to_uri(root), "rootPath": root, "capabilities": { "textDocument": { "synchronization": { "dynamicRegistration": True, "willSave": False, "willSaveWaitUntil": False, "didSave": True, }, "completion": { "dynamicRegistration": True, "completionItem": { "snippetSupport": False, "commitCharactersSupport": True, "deprecatedSupport": True, "preselectSupport": True, "labelDetailsSupport": True, "insertReplaceSupport": False, "resolveSupport": {"properties": ["documentation", "detail"]}, }, "contextSupport": True, }, "hover": {"dynamicRegistration": True, "contentFormat": ["plaintext", "markdown"]}, "definition": {"dynamicRegistration": True, "linkSupport": False}, "references": {"dynamicRegistration": True}, "rename": {"dynamicRegistration": True, "prepareSupport": False}, "formatting": {"dynamicRegistration": True}, "publishDiagnostics": {"relatedInformation": False, "tagSupport": {"valueSet": [1, 2]}}, }, "workspace": { "configuration": True, "workspaceFolders": False, "didChangeConfiguration": {"dynamicRegistration": True}, }, }, "workspaceFolders": None, }, ) self._send(msg) self._pending[msg_id] = self._on_initialize_result def _on_initialize_result(self, result: dict): self._server_capabilities = result.get("capabilities", {}) self._send(notification("initialized", {})) self._initialized = True log.info("LSP server initialized: %s", self._command) # Replay queued notifications now that the server is ready pending = self._pending_notifications[:] self._pending_notifications.clear() for kind, params in pending: if kind == "open": self.notify_open(params["path"], params["text"], params.get("language_id", "python")) elif kind == "close": self.notify_close(params["path"]) elif kind == "save": self.notify_save(params["path"], params.get("text")) elif kind == "change": self.notify_change(params["path"], params["text"], params.get("version")) self.server_started.emit() # ------------------------------------------------------------------ # Incoming data # ------------------------------------------------------------------ def _on_stdout(self, text: str): self._buf += text.encode("utf-8", errors="replace") self._process_buffer() def _on_stderr(self, text: str): for line in text.strip().splitlines(): log.debug("LSP stderr: %s", line) def _on_exit(self, code: int): log.info("LSP server exited with code %d", code) self._cleanup() def _process_buffer(self): while True: parsed = decode_header(self._buf) if parsed is None: return content_length, header_end = parsed total = header_end + content_length if len(self._buf) < total: return body = self._buf[header_end:total] self._buf = self._buf[total:] try: msg = json.loads(body) except json.JSONDecodeError as e: log.error("LSP JSON decode error: %s", e) continue self._dispatch(msg) def _dispatch(self, msg: dict): if "id" in msg and "method" in msg: self._handle_server_request(msg) elif "id" in msg: self._handle_response(msg) elif "method" in msg: self._handle_notification(msg) # ------------------------------------------------------------------ # Response handling # ------------------------------------------------------------------ def _handle_response(self, msg: dict): msg_id = msg["id"] handler = self._pending.pop(msg_id, None) if handler is None: return error = msg.get("error") if error: log.warning("LSP error (id=%s): [%s] %s", msg_id, error.get("code"), error.get("message")) return handler(msg.get("result")) # ------------------------------------------------------------------ # Server request handling # ------------------------------------------------------------------ def _handle_server_request(self, msg: dict): method = msg["method"] msg_id = msg["id"] if method == "workspace/configuration": items = msg.get("params", {}).get("items", []) self._send(response(msg_id, result=[{} for _ in items])) elif method == "client/registerCapability": self._send(response(msg_id, result=None)) elif method == "window/workDoneProgress/create": self._send(response(msg_id, result=None)) else: log.debug("Unhandled server request: %s", method) self._send(response(msg_id, result=None)) # ------------------------------------------------------------------ # Notification handling # ------------------------------------------------------------------ def _handle_notification(self, msg: dict): method = msg["method"] params = msg.get("params", {}) if method == "textDocument/publishDiagnostics": self._handle_diagnostics(params) elif method == "window/logMessage" or method == "window/showMessage": text = params.get("message", "") log.debug("LSP [%s]: %s", method, text) elif method.startswith("$/"): pass # progress, cancel, etc. else: self.notification_received.emit(method, params) # ------------------------------------------------------------------ # Result handlers # ------------------------------------------------------------------ def _handle_completion(self, result: Any): if result is None: self.state.completion_received.emit([]) return raw_items = result.get("items", result) if isinstance(result, dict) else result if not isinstance(raw_items, list): self.state.completion_received.emit([]) return items = [CompletionItem.from_dict(item) for item in raw_items] items.sort(key=lambda c: c.sort_text or c.label) self.state.completion_received.emit(items) def _handle_definition(self, result: Any): if result is None: self.state.definition_received.emit([]) return if isinstance(result, dict): result = [result] locations = [Location.from_dict(loc) for loc in result] self.state.definition_received.emit(locations) def _handle_hover(self, result: Any): if result is None: return hover = Hover.from_dict(result) line = col = 0 if hover.range: line = hover.range.start.line col = hover.range.start.character self.state.hover_received.emit(hover.contents, line, col) def _handle_references(self, result: Any): if result is None: self.state.references_received.emit([]) return locations = [Location.from_dict(loc) for loc in result] self.state.references_received.emit(locations) def _handle_rename(self, result: Any): if result is None: return file_edits: dict[str, list[tuple[int, int, int, int, str]]] = {} changes = result.get("changes", {}) document_changes = result.get("documentChanges", []) if document_changes: for doc_change in document_changes: raw_edits = doc_change.get("edits", []) uri = doc_change.get("textDocument", {}).get("uri", "") path = uri_to_path(uri) file_edits[path] = self._convert_lsp_edits(raw_edits) elif changes: for uri, raw_edits in changes.items(): path = uri_to_path(uri) file_edits[path] = self._convert_lsp_edits(raw_edits) total = sum(len(e) for e in file_edits.values()) if file_edits: self.state.rename_edits_received.emit(file_edits) self.state.status_message.emit(f"Rename: {total} edits across {len(file_edits)} file(s)") else: self.state.status_message.emit("Rename: no edits returned") @staticmethod def _convert_lsp_edits(raw_edits: list[dict]) -> list[tuple[int, int, int, int, str]]: """Convert LSP TextEdit dicts to ``(start_line, start_col, end_line, end_col, new_text)`` tuples.""" out: list[tuple[int, int, int, int, str]] = [] for edit in raw_edits: r = edit.get("range", {}) start = r.get("start", {}) end = r.get("end", {}) out.append(( start.get("line", 0), start.get("character", 0), end.get("line", 0), end.get("character", 0), edit.get("newText", ""), )) return out def _handle_formatting(self, result: Any, path: str): if not result: self.state.status_message.emit("Formatting: no edits returned") return edits = self._convert_lsp_edits(result) if edits: self.state.formatting_edits_received.emit(path, edits) self.state.status_message.emit(f"Formatting: {len(edits)} edits applied") else: self.state.status_message.emit("Formatting: no edits returned") def _handle_diagnostics(self, params: dict): uri = params.get("uri", "") path = uri_to_path(uri) raw_diags = params.get("diagnostics", []) diags = [] for d in raw_diags: proto_diag = Diagnostic.from_dict(d) diags.append( StateDiagnostic( path=path, line=proto_diag.range.start.line, col_start=proto_diag.range.start.character, col_end=proto_diag.range.end.character, severity=proto_diag.severity, message=proto_diag.message, source=proto_diag.source, code=proto_diag.code, ) ) self.state.set_diagnostics(path, diags)