Source code for labgrid.driver.openocddriver

# pylint: disable=no-member
import subprocess
import logging
from itertools import chain
import attr

from ..factory import target_factory
from ..protocol import BootstrapProtocol
from ..resource.remote import NetworkAlteraUSBBlaster
from ..resource.udev import AlteraUSBBlaster
from ..step import step
from ..util.managedfile import ManagedFile
from .common import Driver, check_file


[docs]@target_factory.reg_driver @attr.s(cmp=False) class OpenOCDDriver(Driver, BootstrapProtocol): bindings = { "interface": {AlteraUSBBlaster, NetworkAlteraUSBBlaster}, } config = attr.ib(validator=attr.validators.instance_of((str, list))) search = attr.ib( default=[], validator=attr.validators.optional(attr.validators.instance_of((str, list))) ) image = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str)) )
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() self.logger = logging.getLogger("{}:{}".format(self, self.target)) self.config = self.resolve_path_str_or_list(self.config) self.search = self.resolve_path_str_or_list(self.search) # FIXME make sure we always have an environment or config if self.target.env: self.tool = self.target.env.config.get_tool('openocd') or 'openocd' else: self.tool = 'openocd'
[docs] def resolve_path_str_or_list(self, path): if isinstance(path, str): if self.target.env: return [self.target.env.config.resolve_path(path)] return [path] elif isinstance(path, list): if self.target.env: return [self.target.env.config.resolve_path(p) for p in path] # fall-through return path
[docs] @Driver.check_active @step(args=['filename']) def load(self, filename=None): if filename is None and self.image is not None: filename = self.target.env.config.get_image_path(self.image) mf = ManagedFile(filename, self.interface) mf.sync_to_resource() managed_configs = [] for config in self.config: mconfig = ManagedFile(config, self.interface) mconfig.sync_to_resource() managed_configs.append(mconfig) cmd = self.interface.command_prefix+[self.tool] cmd += chain.from_iterable(("--search", path) for path in self.search) for mconfig in managed_configs: cmd.append("--file") cmd.append(mconfig.get_remote_path()) cmd += [ "--command", "'init'", "--command", "'bootstrap {}'".format(mf.get_remote_path()), "--command", "'shutdown'", ] subprocess.check_call(cmd)
[docs] @Driver.check_active @step(args=['commands']) def execute(self, commands: list): managed_configs = [] for config in self.config: mconfig = ManagedFile(config, self.interface) mconfig.sync_to_resource() managed_configs.append(mconfig) cmd = self.interface.command_prefix+[self.tool] cmd += chain.from_iterable(("--search", path) for path in self.search) for mconfig in managed_configs: cmd.append("--file") cmd.append(mconfig.get_remote_path()) cmd += chain.from_iterable(("--command", "'{}'".format(command)) for command in commands) subprocess.check_call(cmd)