Source code for labgrid.driver.sigrokdriver

# pylint: disable=no-member
import logging
import os.path
import re
import subprocess
import shutil
import signal
import tempfile
import uuid
import csv

from time import sleep

import attr

from ..factory import target_factory
from ..resource.remote import NetworkSigrokUSBDevice
from ..resource.udev import SigrokUSBDevice
from ..resource.sigrok import SigrokDevice
from ..step import step
from .common import Driver, check_file


[docs]@target_factory.reg_driver @attr.s(cmp=False) class SigrokDriver(Driver): """The SigrokDriver uses sigrok-cli to record samples and expose them as python dictionaries. Args: bindings (dict): driver to use with sigrok """ bindings = { "sigrok": {SigrokUSBDevice, NetworkSigrokUSBDevice, SigrokDevice}, }
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() # FIXME make sure we always have an environment or config if self.target.env: self.tool = self.target.env.config.get_tool( 'sigrok-cli' ) or 'sigrok-cli' else: self.tool = 'sigrok-cli' self.log = logging.getLogger("SigrokDriver") self._running = False
def _get_sigrok_prefix(self): if isinstance(self.sigrok, (NetworkSigrokUSBDevice, SigrokUSBDevice)): prefix = [ self.tool, "-d", "{}:conn={}.{}".format( self.sigrok.driver, self.sigrok.busnum, self.sigrok.devnum ), "-C", self.sigrok.channels ] else: prefix = [ self.tool, "-d", self.sigrok.driver, "-C", self.sigrok.channels ] return self.sigrok.command_prefix + prefix def _create_tmpdir(self): if isinstance(self.sigrok, NetworkSigrokUSBDevice): self._tmpdir = '/tmp/labgrid-sigrok-{}'.format(uuid.uuid1()) command = self.sigrok.command_prefix + [ 'mkdir', '-p', self._tmpdir ] self.log.debug("Tmpdir command: %s", command) subprocess.call( command, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) self.log.debug("Created tmpdir: %s", self._tmpdir) self._local_tmpdir = tempfile.mkdtemp(prefix="labgrid-sigrok-") self.log.debug("Created local tmpdir: %s", self._local_tmpdir) else: self._tmpdir = tempfile.mkdtemp(prefix="labgrid-sigrok-") self.log.debug("created tmpdir: %s", self._tmpdir) def _delete_tmpdir(self): if isinstance(self.sigrok, NetworkSigrokUSBDevice): command = self.sigrok.command_prefix + [ 'rm', '-r', self._tmpdir ] self.log.debug("Tmpdir command: %s", command) subprocess.call( command, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) shutil.rmtree(self._local_tmpdir) else: shutil.rmtree(self._tmpdir)
[docs] def on_activate(self): self._create_tmpdir()
[docs] def on_deactivate(self): self._delete_tmpdir()
@Driver.check_active @step(title='call', args=['args']) def _call_with_driver(self, *args): combined = self._get_sigrok_prefix() + list(args) self.log.debug("Combined command: %s", " ".join(combined)) self._process = subprocess.Popen( combined, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE ) @Driver.check_active @step(title='call', args=['args']) def _call(self, *args): combined = self.sigrok.command_prefix + [ self.tool, "-C", self.sigrok.channels ] + list(args) self.log.debug("Combined command: %s", combined) self._process = subprocess.Popen( combined, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE )
[docs] @Driver.check_active def capture(self, filename, samplerate="200k"): self._filename = filename self._basename = os.path.basename(self._filename) self.log.debug( "Saving to: %s with basename: %s", self._filename, self._basename ) cmd = [ "-l", "4", "--config", "samplerate={}".format(samplerate), "--continuous", "-o" ] filename = os.path.join(self._tmpdir, self._basename) cmd.append(filename) self._call_with_driver(*cmd) args = self.sigrok.command_prefix + ['test', '-e', filename] while subprocess.call(args): sleep(0.1) self._running = True
[docs] @Driver.check_active def stop(self): assert self._running self._running = False fnames = ['time'] fnames.extend(self.sigrok.channels.split(',')) csv_filename = '{}.csv'.format(os.path.splitext(self._basename)[0]) self._process.send_signal(signal.SIGINT) stdout, stderr = self._process.communicate() self._process.wait() self.log.debug("stdout:\n %s\n ----- \n stderr:\n %s", stdout, stderr) # Convert from .sr to .csv cmd = [ '-i', os.path.join(self._tmpdir, self._basename), '-O', 'csv', '-o', os.path.join(self._tmpdir, csv_filename) ] self._call(*cmd) self._process.wait() stdout, stderr = self._process.communicate() self.log.debug("stdout:\n %s\n ----- \n stderr:\n %s", stdout, stderr) if isinstance(self.sigrok, NetworkSigrokUSBDevice): subprocess.call([ 'scp', '{}:{}'.format( self.sigrok.host, os.path.join(self._tmpdir, self._basename) ), os.path.join(self._local_tmpdir, self._filename) ], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # get csv from remote host subprocess.call([ 'scp', '{}:{}'.format( self.sigrok.host, os.path.join(self._tmpdir, csv_filename) ), os.path.join(self._local_tmpdir, csv_filename) ], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) with open(os.path.join(self._local_tmpdir, csv_filename)) as csv_file: # skip first 5 lines of the csv output, contains metadata and fieldnames for _ in range(0, 5): next(csv_file) return [x for x in csv.DictReader(csv_file, fieldnames=fnames)] else: shutil.copyfile( os.path.join(self._tmpdir, self._basename), self._filename ) with open(os.path.join(self._tmpdir, csv_filename)) as csv_file: # skip first 5 lines of the csv output, contains metadata and fieldnames for _ in range(0, 5): next(csv_file) return [x for x in csv.DictReader(csv_file, fieldnames=fnames)]
[docs] @Driver.check_active def analyze(self, args, filename=None): annotation_regex = re.compile(r'(?P<startnum>\d+)-(?P<endnum>\d+) (?P<decoder>[\w\-]+): (?P<annotation>[\w\-]+): (?P<data>".*)') # pylint: disable=line-too-long if not filename and self._filename: filename = self._filename else: filename = os.path.abspath(filename) check_file(filename, command_prefix=self.sigrok.command_prefix) args.insert(0, filename) if isinstance(args, str): args = args.split(" ") args.insert(0, '-i') args.append("--protocol-decoder-samplenum") args.append("-l") args.append("4") combined = self._get_sigrok_prefix() + args output = subprocess.check_output(combined) return [ match.groupdict() for match in re.finditer(annotation_regex, output.decode("utf-8")) ]