Source code for labgrid.driver.usbstoragedriver

import enum
import os
import pathlib
import time
import subprocess

import attr

from ..factory import target_factory
from ..resource.remote import RemoteUSBResource
from ..step import step
from ..util.managedfile import ManagedFile
from .common import Driver
from ..driver.exception import ExecutionError

from ..util.helper import processwrapper
from ..util.agentwrapper import AgentWrapper
from ..util import Timeout


[docs] class Mode(enum.Enum): DD = "dd" BMAPTOOL = "bmaptool"
[docs] def __str__(self): return self.value
[docs] @target_factory.reg_driver @attr.s(eq=False) class USBStorageDriver(Driver): bindings = { "storage": { "USBMassStorage", "NetworkUSBMassStorage", "USBSDMuxDevice", "NetworkUSBSDMuxDevice", "USBSDWireDevice", "NetworkUSBSDWireDevice", }, } image = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str)) ) WAIT_FOR_MEDIUM_TIMEOUT = 10.0 # s WAIT_FOR_MEDIUM_SLEEP = 0.5 # s
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() self.wrapper = None self.proxy = None
def _start_wrapper(self): if self.wrapper: return host = self.storage.host if isinstance(self.storage, RemoteUSBResource) else None self.wrapper = AgentWrapper(host) self.proxy = self.wrapper.load('udisks2')
[docs] def on_activate(self): pass
[docs] def on_deactivate(self): if self.wrapper: self.wrapper.close() self.wrapper = None self.proxy = None
[docs] @Driver.check_active @step(args=['sources', 'target', 'partition', 'target_is_directory']) def write_files(self, sources, target, partition, target_is_directory=True): """ Write the file(s) specified by filename(s) to the bound USB storage partition. Args: sources (List[str]): path(s) to the file(s) to be copied to the bound USB storage partition. target (str): target directory or file to copy to partition (int): mount the specified partition or None to mount the whole disk target_is_directory (bool): Whether target is a directory """ self._start_wrapper() self.devpath = self._get_devpath(partition) mount_path = self.proxy.mount(self.devpath) try: # (pathlib.PurePath(...) / "/") == "/", so we turn absolute paths into relative # paths with respect to the mount point here target_rel = target.relative_to(target.root) if target.root is not None else target target_path = str(pathlib.PurePath(mount_path) / target_rel) copied_sources = [] for f in sources: mf = ManagedFile(f, self.storage) mf.sync_to_resource() copied_sources.append(mf.get_remote_path()) if target_is_directory: args = ["cp", "-t", target_path] + copied_sources else: if len(sources) != 1: raise ValueError("single source argument required when target_is_directory=False") args = ["cp", "-T", copied_sources[0], target_path] processwrapper.check_output(self.storage.command_prefix + args) self.proxy.unmount(self.devpath) except: # We are going to die with an exception anyway, so no point in waiting # to make sure everything has been written before continuing self.proxy.unmount(self.devpath, lazy=True) raise
[docs] @Driver.check_active @step(args=['filename']) def write_image(self, filename=None, mode=Mode.DD, partition=None, skip=0, seek=0): """ Writes the file specified by filename or if not specified by config image subkey to the bound USB storage root device or partition. Args: filename (str): optional, path to the image to write to bound USB storage mode (Mode): optional, Mode.DD or Mode.BMAPTOOL (defaults to Mode.DD) partition (int or None): optional, write to the specified partition or None for writing to root device (defaults to None) skip (int): optional, skip n 512-sized blocks at start of input file (defaults to 0) seek (int): optional, skip n 512-sized blocks at start of output (defaults to 0) """ if filename is None and self.image is not None: filename = self.target.env.config.get_image_path(self.image) assert filename, "write_image requires a filename" mf = ManagedFile(filename, self.storage) mf.sync_to_resource() self._wait_for_medium(partition) target = self._get_devpath(partition) remote_path = mf.get_remote_path() if mode == Mode.DD: self.logger.info('Writing %s to %s using dd.', remote_path, target) block_size = '512' if skip or seek else '4M' args = [ "dd", f"if={remote_path}", f"of={target}", "oflag=direct", "status=progress", f"bs={block_size}", f"skip={skip}", f"seek={seek}", "conv=fdatasync" ] elif mode == Mode.BMAPTOOL: if skip or seek: raise ExecutionError("bmaptool does not support skip or seek") # Try to find a block map file using the same logic that bmaptool # uses. Handles cases where the image is named like: <image>.bz2 # and the block map file is <image>.bmap mf_bmap = None image_path = filename while True: bmap_path = f"{image_path}.bmap" if os.path.exists(bmap_path): mf_bmap = ManagedFile(bmap_path, self.storage) mf_bmap.sync_to_resource() break image_path, ext = os.path.splitext(image_path) if not ext: break self.logger.info('Writing %s to %s using bmaptool.', remote_path, target) args = [ "bmaptool", "copy", f"{remote_path}", f"{target}", ] if mf_bmap is None: args.append("--nobmap") else: args.append(f"--bmap={mf_bmap.get_remote_path()}") else: raise ValueError processwrapper.check_output( self.storage.command_prefix + args, print_on_silent_log=True )
def _get_devpath(self, partition): partition = "" if partition is None else partition # simple concatenation is sufficient for USB mass storage return f"{self.storage.path}{partition}" @Driver.check_active def _wait_for_medium(self, partition): timeout = Timeout(self.WAIT_FOR_MEDIUM_TIMEOUT) while not timeout.expired: if self.get_size(partition) > 0: break time.sleep(self.WAIT_FOR_MEDIUM_SLEEP) else: raise ExecutionError("Timeout while waiting for medium")
[docs] @Driver.check_active @step(args=['partition'], result=True) def get_size(self, partition=None): """ Get the size of the bound USB storage root device or partition. Args: partition (int or None): optional, get size of the specified partition or None for getting the size of the root device (defaults to None) Returns: int: size in bytes """ args = ["cat", f"/sys/class/block/{self._get_devpath(partition)[5:]}/size"] size = subprocess.check_output(self.storage.command_prefix + args) try: return int(size) * 512 except ValueError: # when the medium gets ready the sysfs attribute is empty for a short time span return 0
[docs] @target_factory.reg_driver @attr.s(eq=False) class NetworkUSBStorageDriver(USBStorageDriver):
[docs] def __attrs_post_init__(self): import warnings warnings.warn("NetworkUSBStorageDriver is deprecated, use USBStorageDriver instead", DeprecationWarning) super().__attrs_post_init__()