Source code for labgrid.driver.lxaiobusdriver

from importlib import import_module

import attr

from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..util.proxy import proxymanager
from .common import Driver
from .exception import ExecutionError
from ..step import step

[docs] @target_factory.reg_driver @attr.s(eq=False) class LXAIOBusPIODriver(Driver, DigitalOutputProtocol): bindings = { "pio": {"LXAIOBusPIO", "NetworkLXAIOBusPIO"}, }
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() self._requests = import_module('requests')
[docs] def on_activate(self): # we can only forward if the backend knows which port to use host, port = proxymanager.get_host_and_port(self.pio) self._url = f'http://{host}:{port}/nodes/{self.pio.node}/pins/{self.pio.pin}/'
[docs] @Driver.check_active @step(args=['status']) def set(self, status): if self.pio.invert: status = not status r = self._requests.post( self._url, data={'value': '1' if status else '0'} ) r.raise_for_status() j = r.json() if j["code"] != 0: raise ExecutionError(f"failed to set value: {j['error_message']}")
[docs] @Driver.check_active @step(result=['True']) def get(self): r = self._requests.get(self._url) r.raise_for_status() j = r.json() if j["code"] != 0: raise ExecutionError(f"failed to get value: {j['error_message']}") result = j["result"] if result not in (0, 1): raise ExecutionError(f"invalid input value: {repr(result)}") status = bool(result) if self.pio.invert: status = not status return status