Source code for labgrid.driver.serialdriver

import logging
import warnings

import attr
from packaging import version
from pexpect import TIMEOUT
import serial
import serial.rfc2217

from ..factory import target_factory
from ..protocol import ConsoleProtocol
from .common import Driver
from .consoleexpectmixin import ConsoleExpectMixin
from ..util.proxy import proxymanager
from ..resource import SerialPort

[docs]@target_factory.reg_driver @attr.s(eq=False) class SerialDriver(ConsoleExpectMixin, Driver, ConsoleProtocol): """ Driver implementing the ConsoleProtocol interface over a SerialPort connection """ # pyserial 3.2.1 does not support RFC2217 under Python 3 # if version.parse(serial.__version__) <= version.Version('3.2.1'): bindings = {"port": "SerialPort", } else: bindings = {"port": {"SerialPort", "NetworkSerialPort"}, } if version.parse(serial.__version__) != version.Version(''): message = ("The installed pyserial version does not contain important RFC2217 fixes.\n" "You can install the labgrid fork via:\n" "pip uninstall pyserial\n" "pip install\n") # pylint: disable=line-too-long warnings.warn(message) txdelay = attr.ib(default=0.0, validator=attr.validators.instance_of(float)) timeout = attr.ib(default=3.0, validator=attr.validators.instance_of(float))
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() self.logger = logging.getLogger(f"{self}({})") if isinstance(self.port, SerialPort): self.serial = serial.Serial() else: if self.port.protocol == "rfc2217": self.serial = serial.rfc2217.Serial() elif self.port.protocol == "raw": self.serial = serial.serial_for_url("socket://", do_not_open=True) else: raise Exception("SerialDriver: unknown protocol") self.status = 0
[docs] def on_activate(self): if isinstance(self.port, SerialPort): self.serial.port = self.port.port self.serial.baudrate = self.port.speed else: host, port = proxymanager.get_host_and_port(self.port) if self.port.protocol == "rfc2217": self.serial.port = f"rfc2217://{host}:{port}?ign_set_control&timeout={self.timeout}" elif self.port.protocol == "raw": self.serial.port = f"socket://{host}:{port}/" else: raise Exception("SerialDriver: unknown protocol") self.serial.baudrate = self.port.speed
[docs] def on_deactivate(self): self.close()
def _read(self, size: int = 1, timeout: float = 0.0): """ Reads 'size' or more bytes from the serialport Keyword Arguments: size -- amount of bytes to read, defaults to 1 """ reading = max(size, self.serial.in_waiting) self.serial.timeout = timeout res = if not res: raise TIMEOUT(f"Timeout of {timeout:.2f} seconds exceeded or connection closed by peer") return res def _write(self, data: bytes): """ Writes 'data' to the serialport Arguments: data -- data to write, must be bytes """ return self.serial.write(data)
[docs] def open(self): """Opens the serialport, does nothing if it is already closed""" if not self.status: try: except serial.SerialException as e: raise serial.SerialException( f"Could not open serial port {self.serial.port}: {str(e)}") from e self.status = 1
[docs] def close(self): """Closes the serialport, does nothing if it is already closed""" if self.status: self.serial.close() self.status = 0