Source code for labgrid.driver.serialdriver

import logging
import warnings

import attr
from packaging import version
from pexpect import TIMEOUT
import serial
import serial.rfc2217

from ..factory import target_factory
from ..protocol import ConsoleProtocol
from .common import Driver
from .consoleexpectmixin import ConsoleExpectMixin
from ..util.proxy import proxymanager
from ..resource import SerialPort


[docs]@target_factory.reg_driver @attr.s(eq=False) class SerialDriver(ConsoleExpectMixin, Driver, ConsoleProtocol): """ Driver implementing the ConsoleProtocol interface over a SerialPort connection """ # pyserial 3.2.1 does not support RFC2217 under Python 3 # https://github.com/pyserial/pyserial/pull/183 if version.parse(serial.__version__) <= version.Version('3.2.1'): bindings = {"port": "SerialPort", } else: bindings = {"port": {"SerialPort", "NetworkSerialPort"}, } if version.parse(serial.__version__) != version.Version('3.4.0.1'): message = ("The installed pyserial version does not contain important RFC2217 fixes.\n" "You can install the labgrid fork via:\n" "pip uninstall pyserial\n" "pip install https://github.com/labgrid-project/pyserial/archive/v3.4.0.1.zip#egg=pyserial\n") # pylint: disable=line-too-long warnings.warn(message) txdelay = attr.ib(default=0.0, validator=attr.validators.instance_of(float)) timeout = attr.ib(default=3.0, validator=attr.validators.instance_of(float))
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() self.logger = logging.getLogger("{}({})".format(self, self.target)) if isinstance(self.port, SerialPort): self.serial = serial.Serial() else: if self.port.protocol == "rfc2217": self.serial = serial.rfc2217.Serial() elif self.port.protocol == "raw": self.serial = serial.serial_for_url("socket://", do_not_open=True) else: raise Exception("SerialDriver: unknown protocol") self.status = 0
[docs] def on_activate(self): if isinstance(self.port, SerialPort): self.serial.port = self.port.port self.serial.baudrate = self.port.speed else: host, port = proxymanager.get_host_and_port(self.port) if self.port.protocol == "rfc2217": self.serial.port = "rfc2217://{}:{}?ign_set_control&timeout={}".format(host, port, self.timeout) elif self.port.protocol == "raw": self.serial.port = "socket://{}:{}/".format(host, port) else: raise Exception("SerialDriver: unknown protocol") self.serial.baudrate = self.port.speed self.open()
[docs] def on_deactivate(self): self.close()
def _read(self, size: int = 1, timeout: float = 0.0): """ Reads 'size' or more bytes from the serialport Keyword Arguments: size -- amount of bytes to read, defaults to 1 """ reading = max(size, self.serial.in_waiting) self.serial.timeout = timeout res = self.serial.read(reading) if not res: raise TIMEOUT("Timeout of %.2f seconds exceeded or connection closed by peer" % timeout) return res def _write(self, data: bytes): """ Writes 'data' to the serialport Arguments: data -- data to write, must be bytes """ return self.serial.write(data)
[docs] def open(self): """Opens the serialport, does nothing if it is already closed""" if not self.status: try: self.serial.open() except serial.SerialException as e: raise serial.SerialException( "Could not open serial port {}: {}".format(self.serial.port, str(e))) from e self.status = 1
[docs] def close(self): """Closes the serialport, does nothing if it is already closed""" if self.status: self.serial.close() self.status = 0