124 lines
3.8 KiB
Python

import os
import sys
import inspect
import json
from subprocess import run, Popen, PIPE
from threading import Thread
from xdg import xdg_data_home
CONTENT_LENGTH = "Content-Length: "
def log (message, *args, **kwargs):
print(message.format(*args, **kwargs), file=sys.stderr)
def process_messages (label, source, sink, handlers):
log(f"[{label}] start process_messages")
while True:
line = source.readline()
if not line:
break
try:
line = line.decode()
if line.startswith(CONTENT_LENGTH):
content_length = int(line[len(CONTENT_LENGTH):].strip())
log(f">> [{label}] ce: [{content_length}]")
source.readline()
payload = source.read(content_length).decode()
log(">> [{label}] payload: [{payload}]", label=label, payload=payload)
if handlers:
payload_parsed = json.loads(payload)
for handler in handlers:
try:
handler(payload_parsed)
except Exception as e:
log(f"Error from handler: {e}")
transmit_payload(payload, sink)
except Exception as e:
log(f"[{label}] Error decoding input: {e}")
log(f"[{label}] exit process_messages")
def transmit_payload (payload, sink):
if isinstance(payload, dict):
payload = json.dumps(payload)
sink.write(f"{CONTENT_LENGTH}{len(payload)}\r\n\r\n{payload}".encode())
sink.flush()
class Handlers:
def __init__ (self):
self.handlers = []
def __call__ (self, func):
self.handlers.append(func)
return func
def get_handlers (self, server):
return [lambda payload: handler(server, payload) for handler in self.handlers]
handler = Handlers()
def method (name):
def method_handler (func):
def handle_method (server, payload):
if payload.get("method", None) != name:
return
func(server, payload.get("params"))
return handler(handle_method)
return method_handler
def command (name):
def command_handler (func):
def handle_command (server, payload):
if payload.get("command", None) != name:
return
func(server, payload.get("arguments"))
return method("workspace/executeCommand")(handle_command)
return command_handler
def get_data_home ():
data_directory = os.path.join(xdg_data_home(), "lsp-proxy")
if not os.path.exists(data_directory):
os.makedirs(data_directory)
return data_directory
class LspServer:
def __init__ (self, command):
self.command = command
self.handlers = []
def start (self):
handlers = self.handlers + handler.get_handlers(self)
log("starting lsp process: {}", " ".join(self.command))
with Popen(self.command, stdin=PIPE, stdout=PIPE) as process:
self.process = process
self.process_reader = Thread(target=process_messages, args=["process.stdout -> sys.stdout reader", process.stdout, sys.stdout.buffer, handlers])
self.process_reader.start()
process_messages("sys.stdin -> process.stdin", sys.stdin.buffer, process.stdin, handlers)
log("lsp process ended")
def start2 (self):
run(self.command)
def send_to_client (self, payload):
transmit_payload(payload, sys.stdout.buffer)
def send_to_server (self, payload):
transmit_payload(payload, self.process.stdout)
def close (self):
if self.process:
self.process.terminate()