Source code for labgrid.driver.usbstoragedriver

# pylint: disable=no-member
import enum
import logging
import os
import time
import attr
import subprocess

from ..factory import target_factory
from ..step import step
from ..util.managedfile import ManagedFile
from .common import Driver
from ..driver.exception import ExecutionError

from ..util.helper import processwrapper
from ..util import Timeout

[docs]class Mode(enum.Enum): DD = 1 BMAPTOOL = 2
[docs]@target_factory.reg_driver @attr.s(eq=False) class USBStorageDriver(Driver): bindings = { "storage": { "USBMassStorage", "NetworkUSBMassStorage", "USBSDMuxDevice", "NetworkUSBSDMuxDevice" }, } image = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str)) )
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() self.logger = logging.getLogger("{}:{}".format(self,
[docs] def on_activate(self): pass
[docs] def on_deactivate(self): pass
[docs] @Driver.check_active @step(args=['filename']) def write_image(self, filename=None, mode=Mode.DD, partition=None, skip=0, seek=0): """ Writes the file specified by filename or if not specified by config image subkey to the bound USB storage root device or partition. Args: filename (str): optional, path to the image to write to bound USB storage mode (Mode): optional, Mode.DD or Mode.BMAPTOOL (defaults to Mode.DD) partition (int or None): optional, write to the specified partition or None for writing to root device (defaults to None) skip (int): optional, skip n 512-sized blocks at start of input file (defaults to 0) seek (int): optional, skip n 512-sized blocks at start of output (defaults to 0) """ if filename is None and self.image is not None: filename = assert filename, "write_image requires a filename" mf = ManagedFile(filename, mf.sync_to_resource() # wait for medium timeout = Timeout(10.0) while not timeout.expired: try: if self.get_size() > 0: break time.sleep(0.5) except ValueError: # when the medium gets ready the sysfs attribute is empty for a short time span continue else: raise ExecutionError("Timeout while waiting for medium") partition = "" if partition is None else partition if mode == Mode.DD: block_size = '512' if skip or seek else '4M' args = [ "dd", "if={}".format(mf.get_remote_path()), "of={}{}".format(, partition), "oflag=direct", "status=progress", "bs={}".format(block_size), "skip={}".format(skip), "seek={}".format(seek), "conv=fdatasync" ] elif mode == Mode.BMAPTOOL: if skip or seek: raise ExecutionError("bmaptool does not support skip or seek") args = [ "bmaptool", "copy", "{}".format(mf.get_remote_path()), "{}{}".format(, partition), ] else: raise ValueError processwrapper.check_output( + args )
[docs] @Driver.check_active @step(result=True) def get_size(self): args = ["cat", "/sys/class/block/{}/size".format([5:])] size = subprocess.check_output( + args) return int(size)*512
[docs]@target_factory.reg_driver @attr.s(eq=False) class NetworkUSBStorageDriver(USBStorageDriver):
[docs] def __attrs_post_init__(self): import warnings warnings.warn("NetworkUSBStorageDriver is deprecated, use USBStorageDriver instead", DeprecationWarning) super().__attrs_post_init__()