Source code for labgrid.driver.sigrokdriver

# pylint: disable=no-member
import logging
import os.path
import re
import subprocess
import shutil
import signal
import tempfile
import time
import uuid
import csv

from time import sleep

import attr

from ..factory import target_factory
from ..protocol import PowerProtocol
from ..resource.remote import NetworkSigrokUSBDevice, NetworkSigrokUSBSerialDevice
from ..resource.udev import SigrokUSBDevice, SigrokUSBSerialDevice
from ..resource.sigrok import SigrokDevice
from ..step import step
from ..util.helper import processwrapper
from .common import Driver, check_file
from .exception import ExecutionError
from .powerdriver import PowerResetMixin


[docs]@attr.s(eq=False) class SigrokCommon(Driver):
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() # FIXME make sure we always have an environment or config if self.target.env: self.tool = self.target.env.config.get_tool( 'sigrok-cli' ) or 'sigrok-cli' else: self.tool = 'sigrok-cli' self.log = logging.getLogger("SigrokDriver") self._running = False
def _create_tmpdir(self): if isinstance(self.sigrok, NetworkSigrokUSBDevice): self._tmpdir = f'/tmp/labgrid-sigrok-{uuid.uuid1()}' command = self.sigrok.command_prefix + [ 'mkdir', '-p', self._tmpdir ] self.log.debug("Tmpdir command: %s", command) subprocess.call( command, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) self.log.debug("Created tmpdir: %s", self._tmpdir) self._local_tmpdir = tempfile.mkdtemp(prefix="labgrid-sigrok-") self.log.debug("Created local tmpdir: %s", self._local_tmpdir) else: self._tmpdir = tempfile.mkdtemp(prefix="labgrid-sigrok-") self.log.debug("created tmpdir: %s", self._tmpdir) def _delete_tmpdir(self): if isinstance(self.sigrok, NetworkSigrokUSBDevice): command = self.sigrok.command_prefix + [ 'rm', '-r', self._tmpdir ] self.log.debug("Tmpdir command: %s", command) subprocess.call( command, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) shutil.rmtree(self._local_tmpdir) else: shutil.rmtree(self._tmpdir)
[docs] def on_activate(self): self._create_tmpdir()
[docs] def on_deactivate(self): self._delete_tmpdir()
def _get_sigrok_prefix(self): prefix = [self.tool] if isinstance(self.sigrok, (NetworkSigrokUSBDevice, SigrokUSBDevice)): prefix += ["-d", f"{self.sigrok.driver}:conn={self.sigrok.busnum}.{self.sigrok.devnum}"] elif isinstance(self.sigrok, (NetworkSigrokUSBSerialDevice, SigrokUSBSerialDevice)): prefix += ["-d", f"{self.sigrok.driver}:conn={self.sigrok.path}"] else: prefix += ["-d", self.sigrok.driver] if self.sigrok.channels: prefix += ["-C", self.sigrok.channels] return self.sigrok.command_prefix + prefix @Driver.check_active @step(title='call', args=['args']) def _call_with_driver(self, *args): combined = self._get_sigrok_prefix() + list(args) self.log.debug("Combined command: %s", " ".join(combined)) self._process = subprocess.Popen( combined, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE ) @Driver.check_active @step(title='call', args=['args']) def _call(self, *args): combined = self.sigrok.command_prefix + [self.tool] if self.sigrok.channels: combined += ["-C", self.sigrok.channels] combined += list(args) self.log.debug("Combined command: %s", combined) self._process = subprocess.Popen( combined, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE )
[docs]@target_factory.reg_driver @attr.s(eq=False) class SigrokDriver(SigrokCommon): """The SigrokDriver uses sigrok-cli to record samples and expose them as python dictionaries. Args: bindings (dict): driver to use with sigrok """ bindings = { "sigrok": {SigrokUSBDevice, NetworkSigrokUSBDevice, SigrokDevice}, }
[docs] @Driver.check_active def capture(self, filename, samplerate="200k"): self._filename = filename self._basename = os.path.basename(self._filename) self.log.debug( "Saving to: %s with basename: %s", self._filename, self._basename ) cmd = [ "-l", "4", "--config", f"samplerate={samplerate}", "--continuous", "-o" ] filename = os.path.join(self._tmpdir, self._basename) cmd.append(filename) self._call_with_driver(*cmd) args = self.sigrok.command_prefix + ['test', '-e', filename] while subprocess.call(args): sleep(0.1) self._running = True
[docs] @Driver.check_active def stop(self): assert self._running self._running = False fnames = ['time'] fnames.extend(self.sigrok.channels.split(',')) csv_filename = f'{os.path.splitext(self._basename)[0]}.csv' self._process.send_signal(signal.SIGINT) stdout, stderr = self._process.communicate() self._process.wait() self.log.debug("stdout:\n %s\n ----- \n stderr:\n %s", stdout, stderr) # Convert from .sr to .csv cmd = [ '-i', os.path.join(self._tmpdir, self._basename), '-O', 'csv', '-o', os.path.join(self._tmpdir, csv_filename) ] self._call(*cmd) self._process.wait() stdout, stderr = self._process.communicate() self.log.debug("stdout:\n %s\n ----- \n stderr:\n %s", stdout, stderr) if isinstance(self.sigrok, NetworkSigrokUSBDevice): subprocess.call([ 'scp', f'{self.sigrok.host}:{os.path.join(self._tmpdir, self._basename)}', os.path.join(self._local_tmpdir, self._filename) ], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # get csv from remote host subprocess.call([ 'scp', f'{self.sigrok.host}:{os.path.join(self._tmpdir, csv_filename)}', os.path.join(self._local_tmpdir, csv_filename) ], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) with open(os.path.join(self._local_tmpdir, csv_filename)) as csv_file: # skip first 5 lines of the csv output, contains metadata and fieldnames for _ in range(0, 5): next(csv_file) return list(csv.DictReader(csv_file, fieldnames=fnames)) else: shutil.copyfile( os.path.join(self._tmpdir, self._basename), self._filename ) with open(os.path.join(self._tmpdir, csv_filename)) as csv_file: # skip first 5 lines of the csv output, contains metadata and fieldnames for _ in range(0, 5): next(csv_file) return list(csv.DictReader(csv_file, fieldnames=fnames))
[docs] @Driver.check_active def analyze(self, args, filename=None): annotation_regex = re.compile(r'(?P<startnum>\d+)-(?P<endnum>\d+) (?P<decoder>[\w\-]+): (?P<annotation>[\w\-]+): (?P<data>".*)') # pylint: disable=line-too-long if not filename and self._filename: filename = self._filename else: filename = os.path.abspath(filename) check_file(filename, command_prefix=self.sigrok.command_prefix) args.insert(0, filename) if isinstance(args, str): args = args.split(" ") args.insert(0, '-i') args.append("--protocol-decoder-samplenum") args.append("-l") args.append("4") combined = self._get_sigrok_prefix() + args output = subprocess.check_output(combined) return [ match.groupdict() for match in re.finditer(annotation_regex, output.decode("utf-8")) ]
[docs]@target_factory.reg_driver @attr.s(eq=False) class SigrokPowerDriver(SigrokCommon, PowerResetMixin, PowerProtocol): """The SigrokPowerDriverDriver uses sigrok-cli to control a PSU and collect measurements. Args: bindings (dict): driver to use with sigrok """ bindings = { "sigrok": {SigrokUSBSerialDevice, NetworkSigrokUSBSerialDevice}, } delay = attr.ib(default=3.0, validator=attr.validators.instance_of(float)) max_voltage = attr.ib( default=None, converter=attr.converters.optional(float), validator=attr.validators.optional(attr.validators.instance_of(float)), ) max_current = attr.ib( default=None, converter=attr.converters.optional(float), validator=attr.validators.optional(attr.validators.instance_of(float)), )
[docs] @Driver.check_active @step() def on(self): processwrapper.check_output( self._get_sigrok_prefix() + ["--config", "enabled=yes", "--set"] )
[docs] @Driver.check_active @step() def off(self): processwrapper.check_output( self._get_sigrok_prefix() + ["--config", "enabled=no", "--set"] )
[docs] @Driver.check_active @step() def cycle(self): self.off() time.sleep(self.delay) self.on()
[docs] @Driver.check_active @step(args=["value"]) def set_voltage_target(self, value): if self.max_voltage is not None and value > self.max_voltage: raise ValueError( f"Requested voltage target({value}) is higher than configured maximum ({self.max_voltage})") # pylint: disable=line-too-long processwrapper.check_output( self._get_sigrok_prefix() + ["--config", f"voltage_target={value:f}", "--set"] )
[docs] @Driver.check_active @step(args=["value"]) def set_current_limit(self, value): if self.max_current is not None and value > self.max_current: raise ValueError( f"Requested current limit ({value}) is higher than configured maximum ({self.max_current})") # pylint: disable=line-too-long processwrapper.check_output( self._get_sigrok_prefix() + ["--config", f"current_limit={value:f}", "--set"] )
[docs] @Driver.check_active @step(result=True) def get(self): out = processwrapper.check_output( self._get_sigrok_prefix() + ["--get", "enabled"] ) if out == b'true': return True elif out == b'false': return False raise ExecutionError(f"Unkown enable status {out}")
[docs] @Driver.check_active @step(result=True) def measure(self): out = processwrapper.check_output( self._get_sigrok_prefix() + ["--show"] ) res = {} for line in out.splitlines(): line = line.strip() if b':' not in line: continue k, v = line.split(b':', 1) if k == b'voltage': res['voltage'] = float(v) elif k == b'current': res['current'] = float(v) if len(res) != 2: raise ExecutionError(f"Cannot parse --show output {out}") return res