Source code for labgrid.driver.qemudriver

"""The QEMUDriver implements a driver to use a QEMU target"""
import atexit
import select
import shlex
import shutil
import socket
import subprocess
import re
import tempfile
import time

import attr
from pexpect import TIMEOUT

from ..factory import target_factory
from ..protocol import PowerProtocol, ConsoleProtocol
from ..step import step
from .common import Driver
from .consoleexpectmixin import ConsoleExpectMixin
from ..util.qmp import QMPMonitor, QMPError
from .exception import ExecutionError


[docs] @target_factory.reg_driver @attr.s(eq=False) class QEMUDriver(ConsoleExpectMixin, Driver, PowerProtocol, ConsoleProtocol): """ The QEMUDriver implements an interface to start targets as qemu instances. The kernel, flash, rootfs and dtb arguments refer to images and paths declared in the environment configuration. Args: qemu_bin (str): reference to the tools key for the QEMU binary machine (str): QEMU machine type cpu (str): QEMU cpu type memory (str): QEMU memory size (ends with M or G) extra_args (str): extra QEMU arguments, they are passed directly to the QEMU binary boot_args (str): optional, additional kernel boot argument kernel (str): optional, reference to the images key for the kernel disk (str): optional, reference to the images key for the disk image disk_opts (str): optional, additional QEMU disk options flash (str): optional, reference to the images key for the flash image rootfs (str): optional, reference to the paths key for use as the virtio-9p filesystem dtb (str): optional, reference to the image key for the device tree bios (str): optional, reference to the image key for the bios image display (str, default="none"): optional, display output to enable; must be one of: none: Do not create a display device fb-headless: Create a headless framebuffer device egl-headless: Create a headless GPU-backed graphics card. Requires host support nic (str): optional, configuration string to pass to QEMU to create a network interface """ qemu_bin = attr.ib(validator=attr.validators.instance_of(str)) machine = attr.ib(validator=attr.validators.instance_of(str)) cpu = attr.ib(validator=attr.validators.instance_of(str)) memory = attr.ib(validator=attr.validators.instance_of(str)) extra_args = attr.ib(validator=attr.validators.instance_of(str)) boot_args = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) kernel = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) disk = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) disk_opts = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) rootfs = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) dtb = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) flash = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) bios = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) display = attr.ib( default="none", validator=attr.validators.optional(attr.validators.and_( attr.validators.instance_of(str), attr.validators.in_(["none", "fb-headless", "egl-headless"]), )) ) nic = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str)))
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() self.status = 0 self.txdelay = None self._child = None self._tempdir = None self._socket = None self._clientsocket = None self._forwarded_ports = {} atexit.register(self._atexit)
def _atexit(self): if not self._child: return self._child.terminate() try: self._child.communicate(timeout=1) except subprocess.TimeoutExpired: self._child.kill() self._child.communicate(timeout=1)
[docs] def get_qemu_version(self, qemu_bin): p = subprocess.run([qemu_bin, "-version"], stdout=subprocess.PIPE, encoding="utf-8") if p.returncode != 0: raise ExecutionError(f"Unable to get QEMU version. QEMU exited with: {p.returncode}") m = re.search(r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<micro>\d+)', p.stdout.splitlines()[0]) if m is None: raise ExecutionError(f"Unable to find QEMU version in: {p.stdout.splitlines()[0]}") return (int(m.group('major')), int(m.group('minor')), int(m.group('micro')))
[docs] def get_qemu_base_args(self): """Returns the base command line used for Qemu without the options related to QMP. These options can be used to start an interactive Qemu manually for debugging tests """ cmd = [] qemu_bin = self.target.env.config.get_tool(self.qemu_bin) if qemu_bin is None: raise KeyError( "QEMU Binary Path not configured in tools configuration key") cmd = [qemu_bin] qemu_version = self.get_qemu_version(qemu_bin) boot_args = [] if self.kernel is not None: cmd.append("-kernel") cmd.append( self.target.env.config.get_image_path(self.kernel)) if self.disk is not None: disk_path = self.target.env.config.get_image_path(self.disk) disk_format = "raw" if disk_path.endswith(".qcow2"): disk_format = "qcow2" disk_opts = "" if self.disk_opts: disk_opts = f",{self.disk_opts}" if self.machine == "vexpress-a9": cmd.append("-drive") cmd.append( f"if=sd,format={disk_format},file={disk_path},id=mmc0{disk_opts}") boot_args.append("root=/dev/mmcblk0p1 rootfstype=ext4 rootwait") elif self.machine in ["pc", "q35", "virt"]: cmd.append("-drive") cmd.append( f"if=virtio,format={disk_format},file={disk_path}{disk_opts}") boot_args.append("root=/dev/vda rootwait") else: raise NotImplementedError( f"QEMU disk image support not implemented for machine '{self.machine}'" ) if self.rootfs is not None: cmd.append("-fsdev") cmd.append( f"local,id=rootfs,security_model=none,path={self.target.env.config.get_path(self.rootfs)}") # pylint: disable=line-too-long cmd.append("-device") cmd.append( "virtio-9p-device,fsdev=rootfs,mount_tag=/dev/root") boot_args.append("root=/dev/root rootfstype=9p rootflags=trans=virtio") if self.dtb is not None: cmd.append("-dtb") cmd.append(self.target.env.config.get_image_path(self.dtb)) if self.flash is not None: cmd.append("-drive") cmd.append( f"if=pflash,format=raw,file={self.target.env.config.get_image_path(self.flash)},id=nor0") # pylint: disable=line-too-long if self.bios is not None: cmd.append("-bios") cmd.append( self.target.env.config.get_image_path(self.bios)) if "-append" in shlex.split(self.extra_args): raise ExecutionError("-append in extra_args not allowed, use boot_args instead") cmd.extend(shlex.split(self.extra_args)) cmd.append("-machine") cmd.append(self.machine) cmd.append("-cpu") cmd.append(self.cpu) cmd.append("-m") cmd.append(self.memory) if self.display == "none": cmd.append("-nographic") elif self.display == "fb-headless": cmd.append("-display") cmd.append("none") elif self.display == "egl-headless": if qemu_version >= (6, 1, 0): cmd.append("-device") cmd.append("virtio-vga-gl") else: cmd.append("-vga") cmd.append("virtio") cmd.append("-display") cmd.append("egl-headless") else: raise ExecutionError(f"Unknown display '{self.display}'") if self.nic: cmd.append("-nic") cmd.append(self.nic) if self.boot_args is not None: boot_args.append(self.boot_args) if self.kernel is not None and boot_args: cmd.append("-append") cmd.append(" ".join(boot_args)) return cmd
[docs] def on_activate(self): self._tempdir = tempfile.mkdtemp(prefix="labgrid-qemu-tmp-") sockpath = f"{self._tempdir}/serialrw" self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._socket.bind(sockpath) self._socket.listen(0) self._cmd = self.get_qemu_base_args() self._cmd.append("-S") self._cmd.append("-qmp") self._cmd.append("stdio") self._cmd.append("-chardev") self._cmd.append(f"socket,id=serialsocket,path={sockpath}") self._cmd.append("-serial") self._cmd.append("chardev:serialsocket")
[docs] def on_deactivate(self): if self.status: self.off() if self._clientsocket: self._clientsocket.close() self._clientsocket = None self._socket.close() self._socket = None shutil.rmtree(self._tempdir)
[docs] @step() def on(self): """Start the QEMU subprocess, accept the unix socket connection and afterwards start the emulator using a QMP Command""" if self.status: return self.logger.debug("Starting with: %s", self._cmd) self._child = subprocess.Popen( self._cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) # prepare for timeout handing self._clientsocket, address = self._socket.accept() self._clientsocket.setblocking(0) self.logger.debug("new connection from %s", address) try: self.qmp = QMPMonitor(self._child.stdout, self._child.stdin) except QMPError as exc: if self._child.poll() is not None: self._child.communicate() raise IOError( f"QEMU process terminated with exit code {self._child.returncode}" ) from exc raise self.status = 1 # Restore port forwards for v in self._forwarded_ports.values(): self._add_port_forward(*v) self.monitor_command("cont")
[docs] @step() def off(self): """Stop the emulator using a monitor command and await the exitcode""" if not self.status: return self.monitor_command('quit') if self._child.wait() != 0: self._child.communicate() raise IOError self._child = None self.status = 0
[docs] def cycle(self): """Cycle the emulator by restarting it""" self.off() self.on()
[docs] @step(result=True, args=['command', 'arguments']) def monitor_command(self, command, arguments={}): """Execute a monitor_command via the QMP""" if not self.status: raise ExecutionError( "Can't use monitor command on non-running target") return self.qmp.execute(command, arguments)
def _add_port_forward(self, proto, local_address, local_port, remote_address, remote_port): self.monitor_command( "human-monitor-command", {"command-line": f"hostfwd_add {proto}:{local_address}:{local_port}-{remote_address}:{remote_port}"}, )
[docs] def add_port_forward(self, proto, local_address, local_port, remote_address, remote_port): self._add_port_forward(proto, local_address, local_port, remote_address, remote_port) self._forwarded_ports[(proto, local_address, local_port)] = (proto, local_address, local_port, remote_address, remote_port)
[docs] def remove_port_forward(self, proto, local_address, local_port): del self._forwarded_ports[(proto, local_address, local_port)] self.monitor_command( "human-monitor-command", {"command-line": f"hostfwd_remove {proto}:{local_address}:{local_port}"}, )
def _read(self, size=1, timeout=10, max_size=None): ready, _, _ = select.select([self._clientsocket], [], [], timeout) if ready: # Collect some more data time.sleep(0.01) # Always read a page, regardless of size size = 4096 size = min(max_size, size) if max_size else size res = self._clientsocket.recv(size) else: raise TIMEOUT(f"Timeout of {timeout:.2f} seconds exceeded") return res def _write(self, data): return self._clientsocket.send(data)
[docs] def __str__(self): return f"QemuDriver({self.target.name})"