import os import sys import inspect import json from subprocess import run, Popen, PIPE from threading import Thread from xdg import xdg_data_home CONTENT_LENGTH = "Content-Length: " def log (message, *args, **kwargs): print(message.format(*args, **kwargs), file=sys.stderr) def process_messages (label, source, sink, handlers): log(f"[{label}] start process_messages") while True: line = source.readline() if not line: break try: line = line.decode() if line.startswith(CONTENT_LENGTH): content_length = int(line[len(CONTENT_LENGTH):].strip()) log(f">> [{label}] ce: [{content_length}]") source.readline() payload = source.read(content_length).decode() log(">> [{label}] payload: [{payload}]", label=label, payload=payload) if handlers: payload_parsed = json.loads(payload) for handler in handlers: try: handler(payload_parsed) except Exception as e: log(f"Error from handler: {e}") transmit_payload(payload, sink) except Exception as e: log(f"[{label}] Error decoding input: {e}") log(f"[{label}] exit process_messages") def transmit_payload (payload, sink): if isinstance(payload, dict): payload = json.dumps(payload) sink.write(f"{CONTENT_LENGTH}{len(payload)}\r\n\r\n{payload}".encode()) sink.flush() class Handlers: def __init__ (self): self.handlers = [] def __call__ (self, func): self.handlers.append(func) return func def get_handlers (self, server): return [lambda payload: handler(server, payload) for handler in self.handlers] handler = Handlers() def method (name): def method_handler (func): def handle_method (server, payload): if payload.get("method", None) != name: return func(server, payload.get("params")) return handler(handle_method) return method_handler def command (name): def command_handler (func): def handle_command (server, payload): if payload.get("command", None) != name: return func(server, payload.get("arguments")) return method("workspace/executeCommand")(handle_command) return command_handler def get_data_home (): data_directory = os.path.join(xdg_data_home(), "lsp-proxy") if not os.path.exists(data_directory): os.makedirs(data_directory) return data_directory class LspServer: def __init__ (self, command): self.command = command self.handlers = [] def start (self): handlers = self.handlers + handler.get_handlers(self) log("starting lsp process: {}", " ".join(self.command)) with Popen(self.command, stdin=PIPE, stdout=PIPE) as process: self.process = process self.process_reader = Thread(target=process_messages, args=["process.stdout reader", process.stdout, sys.stdout.buffer, handlers]) self.process_reader.start() process_messages("process.stdin reader", sys.stdin.buffer, process.stdin, handlers) log("lsp process ended") def start2 (self): run(self.command) def send_to_client (self, payload): transmit_payload(payload, sys.stdout.buffer) def send_to_server (self, payload): transmit_payload(payload, self.process.stdout) def close (self): if self.process: self.process.terminate()