"""LSP JSON-RPC message types and serialization."""
from __future__ import annotations
import itertools
import json
import logging
from dataclasses import dataclass, field
from pathlib import Path
from urllib.parse import unquote, urlparse
log = logging.getLogger(__name__)
_id_counter = itertools.count(1)
# ---------------------------------------------------------------------------
# JSON-RPC framing
# ---------------------------------------------------------------------------
[docs]
def encode_message(obj: dict) -> bytes:
body = json.dumps(obj, separators=(",", ":")).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
return header + body
# ---------------------------------------------------------------------------
# Message constructors
# ---------------------------------------------------------------------------
[docs]
def request(method: str, params: dict | list | None = None) -> tuple[int, dict]:
msg_id = next(_id_counter)
msg: dict = {"jsonrpc": "2.0", "id": msg_id, "method": method}
if params is not None:
msg["params"] = params
return msg_id, msg
[docs]
def notification(method: str, params: dict | list | None = None) -> dict:
msg: dict = {"jsonrpc": "2.0", "method": method}
if params is not None:
msg["params"] = params
return msg
[docs]
def response(msg_id: int | str, result: object = None, error: dict | None = None) -> dict:
msg: dict = {"jsonrpc": "2.0", "id": msg_id}
if error is not None:
msg["error"] = error
else:
msg["result"] = result
return msg
# ---------------------------------------------------------------------------
# URI helpers
# ---------------------------------------------------------------------------
[docs]
def path_to_uri(path: str | Path) -> str:
return Path(path).resolve().as_uri()
[docs]
def uri_to_path(uri: str) -> str:
parsed = urlparse(uri)
return str(Path(unquote(parsed.path)))
# ---------------------------------------------------------------------------
# CompletionItemKind
# ---------------------------------------------------------------------------
# Re-export completion types from core
from simvx.core.ui.completion_types import ( # noqa: E402, F401
KIND_ABBREVIATIONS,
CompletionItemKind,
kind_abbreviation,
)
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
[docs]
@dataclass(slots=True)
class Position:
line: int = 0
character: int = 0
[docs]
def to_dict(self) -> dict:
return {"line": self.line, "character": self.character}
[docs]
@classmethod
def from_dict(cls, d: dict) -> Position:
return cls(line=d.get("line", 0), character=d.get("character", 0))
[docs]
@dataclass(slots=True)
class Range:
start: Position = field(default_factory=Position)
end: Position = field(default_factory=Position)
[docs]
def to_dict(self) -> dict:
return {"start": self.start.to_dict(), "end": self.end.to_dict()}
[docs]
@classmethod
def from_dict(cls, d: dict) -> Range:
return cls(
start=Position.from_dict(d.get("start", {})),
end=Position.from_dict(d.get("end", {})),
)
[docs]
@dataclass(slots=True)
class Diagnostic:
range: Range = field(default_factory=Range)
severity: int = 1
message: str = ""
source: str = ""
code: str = ""
[docs]
@classmethod
def from_dict(cls, d: dict) -> Diagnostic:
return cls(
range=Range.from_dict(d.get("range", {})),
severity=d.get("severity", 1),
message=d.get("message", ""),
source=d.get("source", ""),
code=str(d.get("code", "")),
)
# Re-export CompletionItem from core
from simvx.core.ui.completion_types import CompletionItem # noqa: E402, F401
[docs]
@dataclass(slots=True)
class Location:
uri: str = ""
range: Range = field(default_factory=Range)
[docs]
@classmethod
def from_dict(cls, d: dict) -> Location:
return cls(
uri=d.get("uri", ""),
range=Range.from_dict(d.get("range", {})),
)
@property
def path(self) -> str:
return uri_to_path(self.uri)
[docs]
@dataclass(slots=True)
class Hover:
contents: str = ""
range: Range | None = None
[docs]
@classmethod
def from_dict(cls, d: dict) -> Hover:
raw = d.get("contents", "")
if isinstance(raw, dict):
text = raw.get("value", str(raw))
elif isinstance(raw, list):
parts = []
for item in raw:
parts.append(item.get("value", str(item)) if isinstance(item, dict) else str(item))
text = "\n".join(parts)
else:
text = str(raw)
r = Range.from_dict(d["range"]) if "range" in d else None
return cls(contents=text, range=r)