Source code for labgrid.driver.serialdriver

import logging

import attr
from pexpect import TIMEOUT
import serial
import serial.rfc2217

from ..factory import target_factory
from ..protocol import ConsoleProtocol
from .common import Driver
from .consoleexpectmixin import ConsoleExpectMixin
from ..util.proxy import proxymanager
from ..resource import SerialPort


[docs]@target_factory.reg_driver @attr.s(eq=False) class SerialDriver(ConsoleExpectMixin, Driver, ConsoleProtocol): """ Driver implementing the ConsoleProtocol interface over a SerialPort connection """ bindings = {"port": {"SerialPort", "NetworkSerialPort"}, } txdelay = attr.ib(default=0.0, validator=attr.validators.instance_of(float)) timeout = attr.ib(default=3.0, validator=attr.validators.instance_of(float))
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() self.logger = logging.getLogger(f"{self}({self.target})") if isinstance(self.port, SerialPort): self.serial = serial.Serial() else: if self.port.protocol == "rfc2217": self.serial = serial.rfc2217.Serial() elif self.port.protocol == "raw": self.serial = serial.serial_for_url("socket://", do_not_open=True) else: raise Exception("SerialDriver: unknown protocol") self.status = 0
[docs] def on_activate(self): if isinstance(self.port, SerialPort): self.serial.port = self.port.port self.serial.baudrate = self.port.speed else: host, port = proxymanager.get_host_and_port(self.port) if self.port.protocol == "rfc2217": self.serial.port = f"rfc2217://{host}:{port}?ign_set_control&timeout={self.timeout}" elif self.port.protocol == "raw": self.serial.port = f"socket://{host}:{port}/" else: raise Exception("SerialDriver: unknown protocol") self.serial.baudrate = self.port.speed self.open()
[docs] def on_deactivate(self): self.close()
[docs] @Driver.check_bound def get_export_vars(self): export_vars = { "speed": str(self.port.speed) } if isinstance(self.port, SerialPort): export_vars["port"] = self.port.port else: host, port = proxymanager.get_host_and_port(self.port) export_vars["host"] = host export_vars["port"] = str(port) export_vars["protocol"] = self.port.protocol return export_vars
def _read(self, size: int = 1, timeout: float = 0.0, max_size: int = None): """ Reads 'size' or more bytes from the serialport Keyword Arguments: size -- amount of bytes to read, defaults to 1 max_size -- maximal amount of bytes to read, values 'None' or '0' do not restrict the read length, defaults to None if size == max_size: read and return exactly size = max_size bytes """ reading = max(size, self.serial.in_waiting) if max_size: # limit reading to max_size if provided reading = min(reading, max_size) self.serial.timeout = timeout res = self.serial.read(reading) if not res: raise TIMEOUT(f"Timeout of {timeout:.2f} seconds exceeded or connection closed by peer") return res def _write(self, data: bytes): """ Writes 'data' to the serialport Arguments: data -- data to write, must be bytes """ return self.serial.write(data)
[docs] def open(self): """Opens the serialport, does nothing if it is already closed""" if not self.status: try: self.serial.open() except serial.SerialException as e: raise serial.SerialException( f"Could not open serial port {self.serial.port}: {str(e)}") from e self.status = 1
[docs] def close(self): """Closes the serialport, does nothing if it is already closed""" if self.status: self.serial.close() self.status = 0
[docs] def __str__(self): return f"SerialDriver({self.target.name})"