Source code for labgridhelper.linux

import json
import re

from labgrid.protocol import CommandProtocol
from labgrid.driver import ExecutionError

[docs]def get_systemd_version(command): """Returns systemd version retrieved by parsing output of `systemd --version` Args: command (CommandProtocol): An instance of a Driver implementing the CommandProtocol Returns: int: systemd version number """ assert isinstance(command, CommandProtocol), "command must be a CommandProtocol" out = command.run_check("systemctl --version") out = out[0] parsed = re.search(r'^systemd\s+(?P<version>\d+)', out) if not parsed: raise ValueError("Systemd version output changed") return int(parsed.group("version"))
[docs]def get_systemd_status(command): """Returns parsed output of systemd Manager's ListUnits DBus command Args: command (CommandProtocol): An instance of a Driver implementing the CommandProtocol Returns: dict: dictionary of service names to their properties """ assert isinstance(command, CommandProtocol), "command must be a CommandProtocol" array_notation = "a(ssssssouso)" def get_systemd_status_json(command): out = command.run_check( "busctl call --json=short --no-pager org.freedesktop.systemd1 \ /org/freedesktop/systemd1 org.freedesktop.systemd1.Manager ListUnits" ) out = out[0] out = json.loads(out) if out["type"] != array_notation: raise ValueError("Systemd ListUnits output changed") services = {} for record in out["data"][0]: data = iter(record) name = next(data) services[name] = {} services[name]["description"] = next(data) services[name]["load"] = next(data) services[name]["active"] = next(data) services[name]["sub"] = next(data) services[name]["follow"] = next(data) services[name]["path"] = next(data) services[name]["id"] = int(next(data)) services[name]["type"] = next(data) services[name]["objpath"] = next(data) return services def get_systemd_status_raw(command): out = command.run_check( "busctl call --no-pager org.freedesktop.systemd1 \ /org/freedesktop/systemd1 org.freedesktop.systemd1.Manager ListUnits" ) out = out[0] if array_notation not in out: raise ValueError("Systemd ListUnits output changed") out = out[len(array_notation):] array_length = int(out[:out.index('"')].strip(" ")) out = out[out.index('"')+1:-1] out = out.split('\" \"') data = iter(out) services = {} for _ in range(array_length): name = next(data) services[name] = {} services[name]["description"] = next(data) services[name]["load"] = next(data) services[name]["active"] = next(data) services[name]["sub"] = next(data) services[name]["follow"] = next(data) path_and_id = next(data) pos = path_and_id.index('"') services[name]["path"] = path_and_id[:pos] services[name]["id"] = int(path_and_id[pos+1:-1].strip(" ")) services[name]["type"] = path_and_id[path_and_id.rfind('"'):] services[name]["objpath"] = next(data) return services if get_systemd_version(command) > 239: return get_systemd_status_json(command) else: return get_systemd_status_raw(command)
[docs]def get_commands(command, directories=None): """Returns the commands of a running linux system Args: command (CommandProtocol): An instance of a Driver implementing the CommandProtocol directories (list): An optional list of directories to include Returns: list: list of commands available under linux """ assert isinstance(command, CommandProtocol), "command must be a CommandProtocol" out = command.run_check("ls /usr/bin") out.extend(command.run_check("ls /bin")) out.extend(command.run_check("ls /usr/sbin")) out.extend(command.run_check("ls /sbin")) if directories: assert isinstance(directories, list), "directories must be a list" for directory in directories: out.extend(command.run_check("ls {}".format(directory))) commands = set() for line in out: for cmd in line.split(" "): if cmd: commands.add(cmd) return commands
[docs]def get_systemd_service_active(command, service): """Returns True if service is active, False in all other cases Args: command (CommandProtocol): An instance of a Driver implementing the CommandProtocol service (str): name of the service Returns: bool: True if service is active, False otherwise """ assert isinstance(command, CommandProtocol), "command must be a CommandProtocol" _, _, exitcode = command.run( "systemctl --quiet is-active {}".format(service) ) return exitcode == 0
[docs]def get_interface_ip(command, interface="eth0"): """Returns the global valid IPv4 address of the supplied interface Args: command (CommandProtocol): An instance of a Driver implementing the CommandProtocol interface (string): name of the interface Returns: str: IPv4 address of the interface, None otherwise """ assert isinstance(command, CommandProtocol), "command must be a CommandProtocol" try: ip_string = command.run_check("ip -o -4 addr show") except ExecutionError: return None regex = re.compile( r"""\d+: # Match the leading number \s+(?P<if>\w+) # Match whitespace and interfacename \s+inet\s+(?P<ip>[\d.]+) # Match IP Adress /(?P<prefix>\d+) # Match prefix .*global # Match global scope, not host scope""", re.X ) result = {} for line in ip_string: match = regex.match(line) if match: match = match.groupdict() result[match['if']] = match['ip'] if result: return result[interface] return None
[docs]def get_hostname(command): """Returns the hostname Args: command (CommandProtocol): An instance of a Driver implementing the CommandProtocol Returns: str: hostname of the target, None otherwise """ assert isinstance(command, CommandProtocol), "command must be a CommandProtocol" try: hostname_string = command.run_check("hostname") except ExecutionError: return None return hostname_string[0]
[docs]def systemd_unit_properties(command, unit_properties, unit=''): """Yields the values of the properties of a unit Args: command (CommandProtocol): An instance of a Driver implementing the CommandProtocol unit_properties (iterable): Names of the properties of interest unit (str, optional): The systemd unit of interest, defaults to empty (systemd manager itself) Yields: str: Property value of the unit """ assert isinstance(command, CommandProtocol), "command must be a CommandProtocol" for unit_property in unit_properties: yield command.run_check('systemctl --property={property} --no-pager --value show {unit}'.format( property=unit_property, unit=unit))[0]