Source code for labgrid.util.agents.deditec_relais8

"""
This module implements the communication protocol to switch the digital outputs
on the Deditec Relais8 board.

Supported modules:

- Relais8

Supported Functionality:

- Turn digital output on and off
"""
import logging

from binascii import unhexlify, hexlify

import usb.core
import usb.util


[docs] class Relais8: _padding = 'FEFEFEFEFEFEFEFEFF' _post_padding = '000000' _configuration_sequence = [ '3430FF', '3434FF', '34C0FF', '3400FF', '3402FF', '340CFF', '340EFF', '3404FF', '3406FF', '3408FF', '340AFF', '3410FF', '3412FF', '3414FF', '3416FF', '34F4FF', '34F5FF', '34F6FF', '34F7FF', '34F0FF', '34ECFF']
[docs] def __init__(self, **args): self._dev = usb.core.find(**args) if self._dev is None: raise ValueError("Device not found") if self._dev.is_kernel_driver_active(0): self._dev.detach_kernel_driver(0) cfg = self._dev.get_active_configuration() intf = cfg[(0, 0)] self._dev.set_configuration() usb.util.claim_interface(self._dev, 0) self._dev.ctrl_transfer(64, 0, 0, 0, 0, 5000) self._dev.ctrl_transfer(64, 3, 20, 0, 0, 5000) self._dev.ctrl_transfer(64, 9, 2, 0, 0, 5000) self._ep_out = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == \ usb.util.ENDPOINT_OUT ) self._ep_in = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == \ usb.util.ENDPOINT_IN ) self._sequence_number = 0 self._logger = logging.getLogger("Device: ") self._run_configuration_sequence()
def _run_configuration_sequence(self): for d in Relais8._configuration_sequence: self.write_config_request(d) self.read(255)
[docs] def read(self, size): data = self._ep_in.read(size) self._logger.debug("Read data: %s", hexlify(data)) return data
[docs] def write_config_request(self, address): complete_request = unhexlify(Relais8._padding) complete_request += bytes([self._sequence_number]) complete_request += unhexlify(address + Relais8._post_padding) self._logger.debug("Sending Request: %s", hexlify(complete_request)) self.write(complete_request)
[docs] def write(self, data): self._sequence_number = (self._sequence_number + 1) & 0xff self._logger.debug("Writing data: %s", hexlify(data)) self._ep_out.write(data)
[docs] def set_all_outputs(self, status, reset=False): assert isinstance(status, int) if status == 0: reset = True if status > 255 or status < 0: return data = f'0123{0 if reset else 8}000000001{status:02X}00000000000000' self.write(unhexlify(Relais8._padding + data)) self.read(255)
[docs] def set_output(self, number, status): if status: data = f'238000000001{1 << number - 1:02X}00000000000000' else: data = f'23A000000001{1 << number - 1:02X}00000000000000' complete_request = unhexlify(Relais8._padding) complete_request += bytes([self._sequence_number]) complete_request += unhexlify(data) self.write(complete_request) self.read(255)
[docs] def get_output(self, number): data = '34000000000F' complete_request = unhexlify(Relais8._padding) complete_request += bytes([self._sequence_number]) complete_request += unhexlify(data) self.write(complete_request) read_data = self.read(255).tobytes() index = read_data.find(b'\x1a') if read_data[index+1] == self._sequence_number - 1: outputs = read_data[index+2] else: raise IOError("Could not read outputs from device") return outputs & (1 << (number - 1)) > 0
[docs] def __del__(self): usb.util.release_interface(self._dev, 0)
[docs] def handle_set(busnum, devnum, number, status): relais8 = Relais8(bus=busnum, address=devnum) relais8.set_output(number, status)
[docs] def handle_get(busnum, devnum, number): relais8 = Relais8(bus=busnum, address=devnum) return relais8.get_output(number)
methods = { 'set': handle_set, 'get': handle_get, }