"""Full LSP client using ProcessNode for subprocess management."""
from __future__ import annotations
import json
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any
from simvx.core import Signal
from simvx.core.process_node import ProcessNode
from ..state import Diagnostic as StateDiagnostic
from ..state import IDEState
from .protocol import (
CompletionItem,
Diagnostic,
Hover,
Location,
decode_header,
encode_message,
notification,
path_to_uri,
request,
response,
uri_to_path,
)
log = logging.getLogger(__name__)
[docs]
class LSPClient:
"""Language Server Protocol client communicating via ProcessNode stdin/stdout."""
def __init__(
self, state: IDEState, command: str = "pylsp", args: list[str] | None = None, env: dict[str, str] | None = None
):
self.state = state
self._command = command
self._args = args or []
self._env = env
self._proc: ProcessNode | None = None
self._buf = b""
self._pending: dict[int, Callable] = {}
self._initialized = False
self._server_capabilities: dict = {}
self._doc_versions: dict[str, int] = {}
self._open_docs: set[str] = set()
self._pending_notifications: list[tuple[str, dict]] = [] # queued until initialized
# Signals for raw events (advanced usage)
self.server_started = Signal()
self.server_stopped = Signal()
self.notification_received = Signal()
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
[docs]
def start(self):
if self._proc is not None:
self.stop()
cmd = self._command
if self._args:
cmd += " " + " ".join(self._args)
self._proc = ProcessNode(command=cmd, env=self._env)
self._proc.stdout_data.connect(self._on_stdout)
self._proc.stderr_data.connect(self._on_stderr)
self._proc.process_exited.connect(self._on_exit)
self._proc.start()
self._send_initialize()
[docs]
def stop(self):
if self._proc is None or not self._proc.running:
self._cleanup()
return
msg_id, msg = request("shutdown")
self._send(msg)
self._pending[msg_id] = self._on_shutdown_response
self._initialized = False
def _on_shutdown_response(self, _result: Any):
self._send(notification("exit"))
if self._proc is not None:
self._proc.stop()
self._cleanup()
def _cleanup(self):
self._buf = b""
self._pending.clear()
self._initialized = False
self._open_docs.clear()
self._doc_versions.clear()
self._pending_notifications.clear()
self._proc = None
self.server_stopped.emit()
def _guard_initialized(self) -> bool:
"""Return True if server is initialized and ready for requests."""
return self._initialized and self._proc is not None and self._proc.running
[docs]
def poll(self):
"""Must be called each frame to drive ProcessNode I/O."""
if self._proc is not None and self._proc.running:
self._proc.process(0.0)
# ------------------------------------------------------------------
# Document notifications
# ------------------------------------------------------------------
[docs]
def notify_open(self, path: str, text: str, language_id: str = "python"):
if not self._guard_initialized():
self._pending_notifications.append(("open", {"path": path, "text": text, "language_id": language_id}))
return
uri = path_to_uri(path)
self._doc_versions[uri] = 1
self._open_docs.add(uri)
self._send(
notification(
"textDocument/didOpen",
{"textDocument": {"uri": uri, "languageId": language_id, "version": 1, "text": text}},
)
)
[docs]
def notify_close(self, path: str):
if not self._guard_initialized():
self._pending_notifications.append(("close", {"path": path}))
return
uri = path_to_uri(path)
self._open_docs.discard(uri)
self._doc_versions.pop(uri, None)
self._send(notification("textDocument/didClose", {"textDocument": {"uri": uri}}))
[docs]
def notify_save(self, path: str, text: str | None = None):
if not self._guard_initialized():
self._pending_notifications.append(("save", {"path": path, "text": text}))
return
uri = path_to_uri(path)
params: dict[str, Any] = {"textDocument": {"uri": uri}}
if text is not None:
params["text"] = text
self._send(notification("textDocument/didSave", params))
[docs]
def notify_change(self, path: str, text: str, version: int | None = None):
if not self._guard_initialized():
self._pending_notifications.append(("change", {"path": path, "text": text, "version": version}))
return
uri = path_to_uri(path)
if version is not None:
self._doc_versions[uri] = version
else:
self._doc_versions[uri] = self._doc_versions.get(uri, 0) + 1
ver = self._doc_versions[uri]
self._send(
notification(
"textDocument/didChange",
{
"textDocument": {"uri": uri, "version": ver},
"contentChanges": [{"text": text}],
},
)
)
# ------------------------------------------------------------------
# Requests
# ------------------------------------------------------------------
[docs]
def request_completion(self, path: str, line: int, col: int):
if not self._guard_initialized():
return
uri = path_to_uri(path)
msg_id, msg = request(
"textDocument/completion",
{
"textDocument": {"uri": uri},
"position": {"line": line, "character": col},
},
)
self._send(msg)
self._pending[msg_id] = self._handle_completion
[docs]
def request_definition(self, path: str, line: int, col: int):
if not self._guard_initialized():
return
uri = path_to_uri(path)
msg_id, msg = request(
"textDocument/definition",
{
"textDocument": {"uri": uri},
"position": {"line": line, "character": col},
},
)
self._send(msg)
self._pending[msg_id] = self._handle_definition
[docs]
def request_hover(self, path: str, line: int, col: int):
if not self._guard_initialized():
return
uri = path_to_uri(path)
msg_id, msg = request(
"textDocument/hover",
{
"textDocument": {"uri": uri},
"position": {"line": line, "character": col},
},
)
self._send(msg)
self._pending[msg_id] = self._handle_hover
[docs]
def request_references(self, path: str, line: int, col: int):
if not self._guard_initialized():
return
uri = path_to_uri(path)
msg_id, msg = request(
"textDocument/references",
{
"textDocument": {"uri": uri},
"position": {"line": line, "character": col},
"context": {"includeDeclaration": True},
},
)
self._send(msg)
self._pending[msg_id] = self._handle_references
[docs]
def request_rename(self, path: str, line: int, col: int, new_name: str):
if not self._guard_initialized():
return
uri = path_to_uri(path)
msg_id, msg = request(
"textDocument/rename",
{
"textDocument": {"uri": uri},
"position": {"line": line, "character": col},
"newName": new_name,
},
)
self._send(msg)
self._pending[msg_id] = self._handle_rename
# ------------------------------------------------------------------
# Sending
# ------------------------------------------------------------------
def _send(self, msg: dict):
if self._proc is None or not self._proc.running:
return
data = encode_message(msg)
self._proc.write(data)
# ------------------------------------------------------------------
# Initialize handshake
# ------------------------------------------------------------------
def _send_initialize(self):
root = self.state.project_root or str(Path.cwd())
msg_id, msg = request(
"initialize",
{
"processId": None,
"rootUri": path_to_uri(root),
"rootPath": root,
"capabilities": {
"textDocument": {
"synchronization": {
"dynamicRegistration": True,
"willSave": False,
"willSaveWaitUntil": False,
"didSave": True,
},
"completion": {
"dynamicRegistration": True,
"completionItem": {
"snippetSupport": False,
"commitCharactersSupport": True,
"deprecatedSupport": True,
"preselectSupport": True,
"labelDetailsSupport": True,
"insertReplaceSupport": False,
"resolveSupport": {"properties": ["documentation", "detail"]},
},
"contextSupport": True,
},
"hover": {"dynamicRegistration": True, "contentFormat": ["plaintext", "markdown"]},
"definition": {"dynamicRegistration": True, "linkSupport": False},
"references": {"dynamicRegistration": True},
"rename": {"dynamicRegistration": True, "prepareSupport": False},
"formatting": {"dynamicRegistration": True},
"publishDiagnostics": {"relatedInformation": False, "tagSupport": {"valueSet": [1, 2]}},
},
"workspace": {
"configuration": True,
"workspaceFolders": False,
"didChangeConfiguration": {"dynamicRegistration": True},
},
},
"workspaceFolders": None,
},
)
self._send(msg)
self._pending[msg_id] = self._on_initialize_result
def _on_initialize_result(self, result: dict):
self._server_capabilities = result.get("capabilities", {})
self._send(notification("initialized", {}))
self._initialized = True
log.info("LSP server initialized: %s", self._command)
# Replay queued notifications now that the server is ready
pending = self._pending_notifications[:]
self._pending_notifications.clear()
for kind, params in pending:
if kind == "open":
self.notify_open(params["path"], params["text"], params.get("language_id", "python"))
elif kind == "close":
self.notify_close(params["path"])
elif kind == "save":
self.notify_save(params["path"], params.get("text"))
elif kind == "change":
self.notify_change(params["path"], params["text"], params.get("version"))
self.server_started.emit()
# ------------------------------------------------------------------
# Incoming data
# ------------------------------------------------------------------
def _on_stdout(self, text: str):
self._buf += text.encode("utf-8", errors="replace")
self._process_buffer()
def _on_stderr(self, text: str):
for line in text.strip().splitlines():
log.debug("LSP stderr: %s", line)
def _on_exit(self, code: int):
log.info("LSP server exited with code %d", code)
self._cleanup()
def _process_buffer(self):
while True:
parsed = decode_header(self._buf)
if parsed is None:
return
content_length, header_end = parsed
total = header_end + content_length
if len(self._buf) < total:
return
body = self._buf[header_end:total]
self._buf = self._buf[total:]
try:
msg = json.loads(body)
except json.JSONDecodeError as e:
log.error("LSP JSON decode error: %s", e)
continue
self._dispatch(msg)
def _dispatch(self, msg: dict):
if "id" in msg and "method" in msg:
self._handle_server_request(msg)
elif "id" in msg:
self._handle_response(msg)
elif "method" in msg:
self._handle_notification(msg)
# ------------------------------------------------------------------
# Response handling
# ------------------------------------------------------------------
def _handle_response(self, msg: dict):
msg_id = msg["id"]
handler = self._pending.pop(msg_id, None)
if handler is None:
return
error = msg.get("error")
if error:
log.warning("LSP error (id=%s): [%s] %s", msg_id, error.get("code"), error.get("message"))
return
handler(msg.get("result"))
# ------------------------------------------------------------------
# Server request handling
# ------------------------------------------------------------------
def _handle_server_request(self, msg: dict):
method = msg["method"]
msg_id = msg["id"]
if method == "workspace/configuration":
items = msg.get("params", {}).get("items", [])
self._send(response(msg_id, result=[{} for _ in items]))
elif method == "client/registerCapability":
self._send(response(msg_id, result=None))
elif method == "window/workDoneProgress/create":
self._send(response(msg_id, result=None))
else:
log.debug("Unhandled server request: %s", method)
self._send(response(msg_id, result=None))
# ------------------------------------------------------------------
# Notification handling
# ------------------------------------------------------------------
def _handle_notification(self, msg: dict):
method = msg["method"]
params = msg.get("params", {})
if method == "textDocument/publishDiagnostics":
self._handle_diagnostics(params)
elif method == "window/logMessage" or method == "window/showMessage":
text = params.get("message", "")
log.debug("LSP [%s]: %s", method, text)
elif method.startswith("$/"):
pass # progress, cancel, etc.
else:
self.notification_received.emit(method, params)
# ------------------------------------------------------------------
# Result handlers
# ------------------------------------------------------------------
def _handle_completion(self, result: Any):
if result is None:
self.state.completion_received.emit([])
return
raw_items = result.get("items", result) if isinstance(result, dict) else result
if not isinstance(raw_items, list):
self.state.completion_received.emit([])
return
items = [CompletionItem.from_dict(item) for item in raw_items]
items.sort(key=lambda c: c.sort_text or c.label)
self.state.completion_received.emit(items)
def _handle_definition(self, result: Any):
if result is None:
self.state.definition_received.emit([])
return
if isinstance(result, dict):
result = [result]
locations = [Location.from_dict(loc) for loc in result]
self.state.definition_received.emit(locations)
def _handle_hover(self, result: Any):
if result is None:
return
hover = Hover.from_dict(result)
line = col = 0
if hover.range:
line = hover.range.start.line
col = hover.range.start.character
self.state.hover_received.emit(hover.contents, line, col)
def _handle_references(self, result: Any):
if result is None:
self.state.references_received.emit([])
return
locations = [Location.from_dict(loc) for loc in result]
self.state.references_received.emit(locations)
def _handle_rename(self, result: Any):
if result is None:
return
file_edits: dict[str, list[tuple[int, int, int, int, str]]] = {}
changes = result.get("changes", {})
document_changes = result.get("documentChanges", [])
if document_changes:
for doc_change in document_changes:
raw_edits = doc_change.get("edits", [])
uri = doc_change.get("textDocument", {}).get("uri", "")
path = uri_to_path(uri)
file_edits[path] = self._convert_lsp_edits(raw_edits)
elif changes:
for uri, raw_edits in changes.items():
path = uri_to_path(uri)
file_edits[path] = self._convert_lsp_edits(raw_edits)
total = sum(len(e) for e in file_edits.values())
if file_edits:
self.state.rename_edits_received.emit(file_edits)
self.state.status_message.emit(f"Rename: {total} edits across {len(file_edits)} file(s)")
else:
self.state.status_message.emit("Rename: no edits returned")
@staticmethod
def _convert_lsp_edits(raw_edits: list[dict]) -> list[tuple[int, int, int, int, str]]:
"""Convert LSP TextEdit dicts to ``(start_line, start_col, end_line, end_col, new_text)`` tuples."""
out: list[tuple[int, int, int, int, str]] = []
for edit in raw_edits:
r = edit.get("range", {})
start = r.get("start", {})
end = r.get("end", {})
out.append((
start.get("line", 0),
start.get("character", 0),
end.get("line", 0),
end.get("character", 0),
edit.get("newText", ""),
))
return out
def _handle_formatting(self, result: Any, path: str):
if not result:
self.state.status_message.emit("Formatting: no edits returned")
return
edits = self._convert_lsp_edits(result)
if edits:
self.state.formatting_edits_received.emit(path, edits)
self.state.status_message.emit(f"Formatting: {len(edits)} edits applied")
else:
self.state.status_message.emit("Formatting: no edits returned")
def _handle_diagnostics(self, params: dict):
uri = params.get("uri", "")
path = uri_to_path(uri)
raw_diags = params.get("diagnostics", [])
diags = []
for d in raw_diags:
proto_diag = Diagnostic.from_dict(d)
diags.append(
StateDiagnostic(
path=path,
line=proto_diag.range.start.line,
col_start=proto_diag.range.start.character,
col_end=proto_diag.range.end.character,
severity=proto_diag.severity,
message=proto_diag.message,
source=proto_diag.source,
code=proto_diag.code,
)
)
self.state.set_diagnostics(path, diags)