Source code for labgrid.driver.lxaiobusdriver

import attr

import requests

from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..util.proxy import proxymanager
from .common import Driver
from .exception import ExecutionError
from ..step import step

[docs]@target_factory.reg_driver @attr.s(eq=False) class LXAIOBusPIODriver(Driver, DigitalOutputProtocol): bindings = { "pio": {"LXAIOBusPIO", "NetworkLXAIOBusPIO"}, }
[docs] def __attrs_post_init__(self): super().__attrs_post_init__()
[docs] def on_activate(self): # we can only forward if the backend knows which port to use host, port = proxymanager.get_host_and_port(self.pio) self._url = 'http://{}:{}/nodes/{}/pins/{}/'.format( host, port, self.pio.node, self.pio.pin)
[docs] @Driver.check_active @step(args=['status']) def set(self, status): if self.pio.invert: status = not status r = requests.post( self._url, data={'value': '1' if status else '0'} ) r.raise_for_status() j = r.json() if j["code"] != 0: raise ExecutionError("failed to set value: {}".format(j["error_message"]))
[docs] @Driver.check_active @step(result=['True']) def get(self): r = requests.get(self._url) r.raise_for_status() j = r.json() if j["code"] != 0: raise ExecutionError("failed to get value: {}".format(j["error_message"])) result = j["result"] if result not in (0, 1): raise ExecutionError("invalid input value: {}".format(repr(result))) status = bool(result) if self.pio.invert: status = not status return status