Source code for labgrid.util.agent

#!/usr/bin/env python3

import json
import os
import sys
import base64

[docs]def b2s(b): return base64.b85encode(b).decode('ascii')
[docs]def s2b(s): return base64.b85decode(s.encode('ascii'))
[docs]class Agent:
[docs] def __init__(self): self.methods = {}
@staticmethod def _send(data): sys.stdout.write(json.dumps(data)+'\n') sys.stdout.flush()
[docs] def register(self, name, func): assert name not in self.methods self.methods[name] = func
[docs] def run(self): for line in sys.stdin: if not line: continue try: request = json.loads(line) except json.JSONDecodeError: Agent._send({'error': 'request parsing failed'}) break if request.get('close', False): break name = request['method'] args = request['args'] kwargs = request['kwargs'] try: response = self.methods[name](*args, **kwargs) Agent._send({'result': response}) except Exception as e: # pylint: disable=broad-except Agent._send({'exception': repr(e)}) break
[docs]def handle_test(*args, **kwargs): # pylint: disable=unused-argument return args[::-1]
[docs]def handle_error(message): raise RuntimeError(message)
[docs]def handle_usbtmc(index, cmd, read=False): assert isinstance(index, int) cmd = s2b(cmd) fd = os.open('/dev/usbtmc{}'.format(index), os.O_RDWR) os.write(fd, cmd) if not read: os.close(fd) return None data = [] while True: data.append(os.read(fd, 4096)) if len(data[-1]) < 4096: break os.close(fd) return b2s(b''.join(data))
[docs]def main(): a = Agent() a.register('test', handle_test) a.register('error', handle_error) a.register('usbtmc', handle_usbtmc) a.run()
if __name__ == "__main__": main()