# pylint: disable=no-member
import logging
import re
import shlex
from time import sleep
import attr
from pexpect import TIMEOUT
from ..factory import target_factory
from ..protocol import CommandProtocol, ConsoleProtocol, LinuxBootProtocol
from ..step import step
from ..util import gen_marker, Timeout
from .common import Driver
from .commandmixin import CommandMixin
from .exception import ExecutionError
@target_factory.reg_driver
@attr.s
[docs]class BareboxDriver(CommandMixin, Driver, CommandProtocol, LinuxBootProtocol):
"""BareboxDriver - Driver to control barebox via the console.
BareboxDriver binds on top of a ConsoleProtocol.
Args:
prompt (str): The default Barebox Prompt
"""
bindings = {"console": ConsoleProtocol, }
prompt = attr.ib(default="", validator=attr.validators.instance_of(str))
autoboot = attr.ib(default="stop autoboot", validator=attr.validators.instance_of(str))
interrupt = attr.ib(default="\n", validator=attr.validators.instance_of(str))
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.re_vt100 = re.compile(
r'(\x1b\[|\x9b)[^@-_a-z]*[@-_a-z]|\x1b[@-_a-z]'
)
self.logger = logging.getLogger("{}:{}".format(self, self.target))
self._status = 0
[docs] def on_activate(self):
"""Activate the BareboxDriver
This function checks for a prompt and awaits it if not already active
"""
self._check_prompt()
if self._status == 0:
self._await_prompt()
[docs] def on_deactivate(self):
"""Deactivate the BareboxDriver
Simply sets the internal status to 0
"""
self._status = 0
@Driver.check_active
@step(args=['cmd'])
[docs] def run(self, cmd: str, *, step):
"""
Runs the specified command on the shell and returns the output.
Args:
cmd (str): command to run on the shell
Returns:
Tuple[List[str],List[str], int]: if successful, None otherwise
"""
# FIXME: Handle pexpect Timeout
marker = gen_marker()
# hide marker from expect
hidden_marker = '"{}""{}"'.format(marker[:4], marker[4:])
cmp_command = '''echo -o /cmd {}; echo {}; sh /cmd; echo {} $?;'''.format(
shlex.quote(cmd), hidden_marker, hidden_marker,
)
if self._status == 1:
self.console.sendline(cmp_command)
_, _, match, _ = self.console.expect(r'{}(.*){}\s+(\d+)\s+.*{}'.format(
marker, marker, self.prompt
))
# Remove VT100 Codes and split by newline
data = self.re_vt100.sub('', match.group(1).decode('utf-8')).split('\r\n')[1:-1]
self.logger.debug("Received Data: %s", data)
# Get exit code
exitcode = int(match.group(2))
return (data, [], exitcode)
else:
return None
@Driver.check_active
[docs] def run_check(self, cmd: str):
"""
Runs the specified command on the shell and returns the output if successful,
raises ExecutionError otherwise.
Args:
cmd (str): command to run on the shell
Returns:
List[str]: stdout of the executed command
"""
res = self.run(cmd)
if res[2] != 0:
raise ExecutionError(cmd)
return res[0]
[docs] def get_status(self):
"""Retrieve status of the BareboxDriver
0 means inactive, 1 means active.
Returns:
int: status of the driver
"""
return self._status
def _check_prompt(self):
"""
Internal function to check if we have a valid prompt.
It sets the internal _status to 1 or 0 based on the prompt detection.
"""
self.console.sendline("")
try:
self.console.expect(self.prompt, timeout=1)
self._status = 1
except TIMEOUT:
self._status = 0
def _await_prompt(self):
"""Await autoboot line and stop it to get to the prompt"""
self.console.expect(r"[\n]barebox 20\d+")
index, _, _, _ = self.console.expect([self.prompt, self.autoboot])
if index == 0:
self._status = 1
else:
self.console.write(self.interrupt.encode('ASCII'))
self._check_prompt()
@Driver.check_active
[docs] def await_boot(self):
"""Wait for the initial Linux version string to verify we succesfully
jumped into the kernel.
"""
self.console.expect(r"Linux version \d")
@Driver.check_active
[docs] def boot(self, name: str):
"""Boot the default or a specific boot entry
Args:
name (str): name of the entry to boot"""
if name:
self.console.sendline("boot -v {}".format(name))
else:
self.console.sendline("boot -v")