Source code for labgrid.util.agentwrapper

import base64
import hashlib
import json
import os.path
import subprocess
import traceback
import logging

[docs]def b2s(b): return base64.b85encode(b).decode('ascii')
[docs]def s2b(s): return base64.b85decode(s.encode('ascii'))
[docs]class AgentError(Exception): pass
[docs]class AgentException(Exception): pass
[docs]class MethodProxy:
[docs] def __init__(self, wrapper, name): self.wrapper = wrapper self.name = name
[docs] def __call__(self, *args, **kwargs): return self.wrapper.call(self.name, *args, **kwargs)
[docs]class ModuleProxy:
[docs] def __init__(self, wrapper, name): self.wrapper = wrapper self.name = name
[docs] def __getattr__(self, name): return MethodProxy(self.wrapper, '{}.{}'.format(self.name, name))
[docs]class AgentWrapper:
[docs] def __init__(self, host=None): self.agent = None self.loaded = {} self.logger = logging.getLogger("ResourceExport({})".format(host)) agent = os.path.join( os.path.abspath(os.path.dirname(__file__)), 'agent.py') if host: # copy agent.py and run via ssh agent_data = open(agent, 'rb').read() agent_hash = hashlib.sha256(agent_data).hexdigest() agent_remote = '.labgrid_agent_{}.py'.format(agent_hash) ssh_opts = 'ssh -x -o ConnectTimeout=5 -o PasswordAuthentication=no'.split() subprocess.check_call( ['rsync', '-e', ' '.join(ssh_opts), '-tq', agent, '{}:{}'.format(host, agent_remote)], ) self.agent = subprocess.Popen( ssh_opts + [host, '--', 'python3', agent_remote], stdin=subprocess.PIPE, stdout=subprocess.PIPE) else: # run locally self.agent = subprocess.Popen( ['python3', agent], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
[docs] def __del__(self): self.close()
[docs] def __getattr__(self, name): return MethodProxy(self, name)
[docs] def call(self, method, *args, **kwargs): request = { 'method': method, 'args': args, 'kwargs': kwargs, } request = json.dumps(request) request = request.encode('ASCII') self.agent.stdin.write(request+b'\n') self.agent.stdin.flush() response = self.agent.stdout.readline() response = response.decode('ASCII') response = json.loads(response) if 'result' in response: return response['result'] elif 'exception' in response: e = response['exception'] # work around BaseException repr change # https://bugs.python.org/issue30399 if e[-2:] == ',)': e = e[:-2] + ')' self.logger.debug("Traceback from agent (most recent call last) for %s:", e) for line in ''.join(traceback.format_list(response['tb'])).splitlines(): self.logger.debug(line) raise AgentException(e) elif 'error' in response: self.agent.wait() self.agent = None raise AgentError(response['error']) raise AgentError("unknown response from agent: {}".format(response))
[docs] def load(self, name): if name in self.loaded: return self.loaded[name] filename = os.path.join( os.path.abspath(os.path.dirname(__file__)), 'agents', '{}.py'.format(name)) source = open(filename, 'r').read() self.call('load', name, source) proxy = ModuleProxy(self, name) self.loaded[name] = proxy return proxy
[docs] def close(self): if self.agent is None: return request = { 'close': True, } request = json.dumps(request) request = request.encode('ASCII') self.agent.stdin.write(request+b'\n') self.agent.stdin.flush() self.agent.wait() self.agent = None