Source code for labgrid.driver.serialdriver

import logging

import attr
import serial
import serial.rfc2217
from pexpect import TIMEOUT

from ..factory import target_factory
from ..protocol import ConsoleProtocol
from ..resource import SerialPort, NetworkSerialPort
from .common import Driver
from .consoleexpectmixin import ConsoleExpectMixin


@target_factory.reg_driver
@attr.s
[docs]class SerialDriver(ConsoleExpectMixin, Driver, ConsoleProtocol): """ Driver implementing the ConsoleProtocol interface over a SerialPort connection """ # pyserial 3.2.1 does not support RFC2217 under Python 3 # https://github.com/pyserial/pyserial/pull/183 if tuple(int(x) for x in serial.__version__.split('.')) <= (3, 2, 1): bindings = {"port": SerialPort, } else: bindings = {"port": {SerialPort, NetworkSerialPort}, } txdelay = attr.ib(default=0.0, validator=attr.validators.instance_of(float)) def __attrs_post_init__(self): super().__attrs_post_init__() self.logger = logging.getLogger("{}({})".format(self, self.target)) if isinstance(self.port, SerialPort): self.serial = serial.Serial() else: self.serial = serial.rfc2217.Serial() self.status = 0
[docs] def on_activate(self): if isinstance(self.port, SerialPort): self.serial.port = self.port.port self.serial.baudrate = self.port.speed else: self.serial.port = "rfc2217://{}:{}/".format(self.port.host, self.port.port) self.serial.baudrate = self.port.speed self.open()
def _read(self, size: int=1, timeout: float=0.0): """ Reads 'size' or more bytes from the serialport Keyword Arguments: size -- amount of bytes to read, defaults to 1 """ reading = max(size, self.serial.in_waiting) self.serial.timeout = timeout res = self.serial.read(reading) if not res: raise TIMEOUT("Timeout of %.2f seconds exceeded" % timeout) return res def _write(self, data: bytes): """ Writes 'data' to the serialport Arguments: data -- data to write, must be bytes """ return self.serial.write(data)
[docs] def open(self): """Opens the serialport, does nothing if it is already closed""" if not self.status: self.serial.open() self.status = 1
[docs] def close(self): """Closes the serialport, does nothing if it is already closed""" if self.status: self.serial.close() self.status = 0