#!/usr/bin/env python3
import json
import os
import signal
import sys
import base64
import types
import socket
import pickle
[docs]
def b2s(b):
return base64.b85encode(b).decode('ascii')
[docs]
def s2b(s):
return base64.b85decode(s.encode('ascii'))
# Similar to b2s and s2b, py2s and s2py will convert python objects to strings
# using pickle. This is important when exact python types need to be passed to
# the wrappers, for example tuples.
# Only use these to pass parameters from the wrapper to the agent, as
# unpickling is not secure with untrusted data.
[docs]
def py2s(o):
return b2s(pickle.dumps(o))
[docs]
def s2py(s):
return pickle.loads(s2b(s))
[docs]
class Agent:
[docs]
def __init__(self):
self.methods = {}
self.register('load', self.load)
self.register('list', self.list)
# use real stdin/stdout
self.stdin = sys.stdin
self.stdout = sys.stdout
# use stderr for normal prints
sys.stdout = sys.stderr
self.fdpass = None
if fdpass_env := os.environ.get("LG_FDPASS"):
self.fdpass = socket.socket(fileno=int(fdpass_env))
[docs]
def send(self, data):
self.stdout.write(json.dumps(data)+'\n')
self.stdout.flush()
[docs]
def register(self, name, func):
assert name not in self.methods
self.methods[name] = func
[docs]
def load(self, name, source):
module = types.ModuleType(name)
exec(compile(source, f'<loaded {name}>', 'exec'), module.__dict__)
for k, v in module.methods.items(): # pylint: disable=no-member
self.register(f'{name}.{k}', v)
[docs]
def list(self):
return list(self.methods.keys())
[docs]
def run(self):
for line in self.stdin:
if not line:
continue
try:
request = json.loads(line)
except json.JSONDecodeError:
self.send({'error': f'request parsing failed for {repr(line)}'})
break
if request.get('close', False):
break
name = request['method']
args = request['args']
kwargs = request['kwargs']
try:
response = self.methods[name](*args, **kwargs)
# check if the method returned a file descriptor
if isinstance(response, tuple) and len(response) == 2 and hasattr(response[1], 'fileno'):
try:
if self.fdpass is None:
self.send({'error': 'cannot pass returned FD without LG_FDPASS'})
break
socket.send_fds(self.fdpass, [b"\0"], (response[1].fileno(),))
self.send({'result': response[0], 'fdpass': True})
finally:
response[1].close()
else:
self.send({'result': response})
except Exception as e: # pylint: disable=broad-except
import traceback
try:
tb = [list(x) for x in traceback.extract_tb(sys.exc_info()[2])]
except:
tb = None
self.send({'exception': repr(e), 'tb': tb})
[docs]
def handle_test(*args, **kwargs): # pylint: disable=unused-argument
return args[::-1]
[docs]
def handle_test_fd():
fd = os.fdopen(os.memfd_create("test_fd"))
return ("dummy", fd)
[docs]
def handle_error(message):
raise ValueError(message)
[docs]
def handle_usbtmc(index, cmd, read=False):
assert isinstance(index, int)
cmd = s2b(cmd)
fd = os.open(f'/dev/usbtmc{index}', os.O_RDWR)
os.write(fd, cmd)
if not read:
os.close(fd)
return None
data = []
while True:
data.append(os.read(fd, 4096))
if len(data[-1]) < 4096:
break
os.close(fd)
return b2s(b''.join(data))
[docs]
def main():
signal.signal(signal.SIGINT, signal.SIG_IGN)
a = Agent()
a.register('test', handle_test)
a.register('test_fd', handle_test_fd)
a.register('error', handle_error)
a.register('usbtmc', handle_usbtmc)
a.run()
if __name__ == "__main__":
main()