Source code for labgrid.util.agent

#!/usr/bin/env python3

import json
import os
import signal
import sys
import base64
import types
import socket
import pickle

[docs] def b2s(b): return base64.b85encode(b).decode('ascii')
[docs] def s2b(s): return base64.b85decode(s.encode('ascii'))
# Similar to b2s and s2b, py2s and s2py will convert python objects to strings # using pickle. This is important when exact python types need to be passed to # the wrappers, for example tuples. # Only use these to pass parameters from the wrapper to the agent, as # unpickling is not secure with untrusted data.
[docs] def py2s(o): return b2s(pickle.dumps(o))
[docs] def s2py(s): return pickle.loads(s2b(s))
[docs] class Agent:
[docs] def __init__(self): self.methods = {} self.register('load', self.load) self.register('list', self.list) # use real stdin/stdout self.stdin = sys.stdin self.stdout = sys.stdout # use stderr for normal prints sys.stdout = sys.stderr self.fdpass = None if fdpass_env := os.environ.get("LG_FDPASS"): self.fdpass = socket.socket(fileno=int(fdpass_env))
[docs] def send(self, data): self.stdout.write(json.dumps(data)+'\n') self.stdout.flush()
[docs] def register(self, name, func): assert name not in self.methods self.methods[name] = func
[docs] def load(self, name, source): module = types.ModuleType(name) exec(compile(source, f'<loaded {name}>', 'exec'), module.__dict__) for k, v in module.methods.items(): # pylint: disable=no-member self.register(f'{name}.{k}', v)
[docs] def list(self): return list(self.methods.keys())
[docs] def run(self): for line in self.stdin: if not line: continue try: request = json.loads(line) except json.JSONDecodeError: self.send({'error': f'request parsing failed for {repr(line)}'}) break if request.get('close', False): break name = request['method'] args = request['args'] kwargs = request['kwargs'] try: response = self.methods[name](*args, **kwargs) # check if the method returned a file descriptor if isinstance(response, tuple) and len(response) == 2 and hasattr(response[1], 'fileno'): try: if self.fdpass is None: self.send({'error': 'cannot pass returned FD without LG_FDPASS'}) break socket.send_fds(self.fdpass, [b"\0"], (response[1].fileno(),)) self.send({'result': response[0], 'fdpass': True}) finally: response[1].close() else: self.send({'result': response}) except Exception as e: # pylint: disable=broad-except import traceback try: tb = [list(x) for x in traceback.extract_tb(sys.exc_info()[2])] except: tb = None self.send({'exception': repr(e), 'tb': tb})
[docs] def handle_test(*args, **kwargs): # pylint: disable=unused-argument return args[::-1]
[docs] def handle_test_fd(): fd = os.fdopen(os.memfd_create("test_fd")) return ("dummy", fd)
[docs] def handle_error(message): raise ValueError(message)
[docs] def handle_usbtmc(index, cmd, read=False): assert isinstance(index, int) cmd = s2b(cmd) fd = os.open(f'/dev/usbtmc{index}', os.O_RDWR) os.write(fd, cmd) if not read: os.close(fd) return None data = [] while True: data.append(os.read(fd, 4096)) if len(data[-1]) < 4096: break os.close(fd) return b2s(b''.join(data))
[docs] def main(): signal.signal(signal.SIGINT, signal.SIG_IGN) a = Agent() a.register('test', handle_test) a.register('test_fd', handle_test_fd) a.register('error', handle_error) a.register('usbtmc', handle_usbtmc) a.run()
if __name__ == "__main__": main()