labgrid.util package
Subpackages
- labgrid.util.agents package
- Submodules
- labgrid.util.agents.deditec_relais8 module
- labgrid.util.agents.dummy module
- labgrid.util.agents.network_interface module
Future
BackgroundLoop
address_from_str()
connection_from_dict()
NMDev
NMDev.__init__()
NMDev.get_settings()
NMDev.get_active_settings()
NMDev.configure()
NMDev.wait_state()
NMDev.disable()
NMDev.get_state()
NMDev.request_scan()
NMDev.get_access_points()
NMDev.get_dhcpd_leases()
NMDev.__dict__
NMDev.__firstlineno__
NMDev.__module__
NMDev.__static_attributes__
NMDev.__weakref__
handle_configure()
handle_wait_state()
handle_disable()
handle_get_active_settings()
handle_get_settings()
handle_get_state()
handle_get_dhcpd_leases()
handle_request_scan()
handle_get_access_points()
- labgrid.util.agents.sysfsgpio module
- labgrid.util.agents.udisks2 module
- labgrid.util.agents.usb_hid_relay module
Submodules
labgrid.util.agent module
- class labgrid.util.agent.Agent[source]
Bases:
object
- __dict__ = mappingproxy({'__module__': 'labgrid.util.agent', '__firstlineno__': 16, '__init__': <function Agent.__init__>, 'send': <function Agent.send>, 'register': <function Agent.register>, 'load': <function Agent.load>, 'list': <function Agent.list>, 'run': <function Agent.run>, '__static_attributes__': ('methods', 'stdin', 'stdout'), '__dict__': <attribute '__dict__' of 'Agent' objects>, '__weakref__': <attribute '__weakref__' of 'Agent' objects>, '__doc__': None, '__annotations__': {}})
- __firstlineno__ = 16
- __module__ = 'labgrid.util.agent'
- __static_attributes__ = ('methods', 'stdin', 'stdout')
- __weakref__
list of weak references to the object
labgrid.util.agentwrapper module
- exception labgrid.util.agentwrapper.AgentError[source]
Bases:
Exception
- __firstlineno__ = 17
- __module__ = 'labgrid.util.agentwrapper'
- __static_attributes__ = ()
- __weakref__
list of weak references to the object
- exception labgrid.util.agentwrapper.AgentException[source]
Bases:
Exception
- __firstlineno__ = 20
- __module__ = 'labgrid.util.agentwrapper'
- __static_attributes__ = ()
- __weakref__
list of weak references to the object
- class labgrid.util.agentwrapper.MethodProxy(wrapper, name)[source]
Bases:
object
- __dict__ = mappingproxy({'__module__': 'labgrid.util.agentwrapper', '__firstlineno__': 23, '__init__': <function MethodProxy.__init__>, '__call__': <function MethodProxy.__call__>, '__static_attributes__': ('name', 'wrapper'), '__dict__': <attribute '__dict__' of 'MethodProxy' objects>, '__weakref__': <attribute '__weakref__' of 'MethodProxy' objects>, '__doc__': None, '__annotations__': {}})
- __firstlineno__ = 23
- __module__ = 'labgrid.util.agentwrapper'
- __static_attributes__ = ('name', 'wrapper')
- __weakref__
list of weak references to the object
- class labgrid.util.agentwrapper.ModuleProxy(wrapper, name)[source]
Bases:
object
- __dict__ = mappingproxy({'__module__': 'labgrid.util.agentwrapper', '__firstlineno__': 31, '__init__': <function ModuleProxy.__init__>, '__getattr__': <function ModuleProxy.__getattr__>, '__static_attributes__': ('name', 'wrapper'), '__dict__': <attribute '__dict__' of 'ModuleProxy' objects>, '__weakref__': <attribute '__weakref__' of 'ModuleProxy' objects>, '__doc__': None, '__annotations__': {}})
- __firstlineno__ = 31
- __module__ = 'labgrid.util.agentwrapper'
- __static_attributes__ = ('name', 'wrapper')
- __weakref__
list of weak references to the object
- class labgrid.util.agentwrapper.AgentWrapper(host=None)[source]
Bases:
object
- __dict__ = mappingproxy({'__module__': 'labgrid.util.agentwrapper', '__firstlineno__': 39, '__init__': <function AgentWrapper.__init__>, '__del__': <function AgentWrapper.__del__>, '__getattr__': <function AgentWrapper.__getattr__>, 'call': <function AgentWrapper.call>, 'load': <function AgentWrapper.load>, 'close': <function AgentWrapper.close>, '__static_attributes__': ('agent', 'loaded', 'logger'), '__dict__': <attribute '__dict__' of 'AgentWrapper' objects>, '__weakref__': <attribute '__weakref__' of 'AgentWrapper' objects>, '__doc__': None, '__annotations__': {}})
- __firstlineno__ = 39
- __module__ = 'labgrid.util.agentwrapper'
- __static_attributes__ = ('agent', 'loaded', 'logger')
- __weakref__
list of weak references to the object
labgrid.util.atomic module
labgrid.util.dict module
This module contains helper functions for working with dictionaries.
- labgrid.util.dict.diff_dict(old, new)[source]
Compares old and new dictionaries, yielding for each difference (key, old_value, new_value). None is used for missing values.
labgrid.util.exceptions module
- exception labgrid.util.exceptions.NoValidDriverError(msg)[source]
Bases:
Exception
- __attrs_attrs__ = (Attribute(name='msg', default=NOTHING, validator=<instance_of validator for type <class 'str'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='msg'),)
- __firstlineno__ = 4
- __init__(msg) None
Method generated by attrs for class NoValidDriverError.
- __match_args__ = ('msg',)
- __module__ = 'labgrid.util.exceptions'
- __replace__(**changes)
Method generated by attrs for class NoValidDriverError.
- __repr__()
Method generated by attrs for class NoValidDriverError.
- __static_attributes__ = ()
- __weakref__
list of weak references to the object
labgrid.util.expect module
- class labgrid.util.expect.PtxExpect(driver)[source]
Bases:
spawn
labgrid Wrapper of the pexpect module.
This class provides pexpect functionality for the ConsoleProtocol classes. driver: ConsoleProtocol object to be passed in
- read_nonblocking(size=1, timeout=-1)[source]
Pexpects needs a nonblocking read function, simply use pyserial with a timeout of 0.
- __firstlineno__ = 6
- __module__ = 'labgrid.util.expect'
- __static_attributes__ = ('driver', 'linesep')
labgrid.util.helper module
- class labgrid.util.helper.ProcessWrapper(callbacks=NOTHING)[source]
Bases:
object
- loglevel = 20
- check_output(command, *, print_on_silent_log=False, input=None, stdin=None)[source]
Run a command and supply the output to callback functions
- __attrs_attrs__ = (Attribute(name='callbacks', default=Factory(factory=<class 'list'>, takes_self=False), validator=None, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='callbacks'),)
- __dict__ = mappingproxy({'__module__': 'labgrid.util.helper', '__firstlineno__': 37, 'loglevel': 20, 'check_output': <function ProcessWrapper.check_output>, 'register': <function ProcessWrapper.register>, 'unregister': <function ProcessWrapper.unregister>, 'log_callback': <staticmethod(<function ProcessWrapper.log_callback>)>, 'print_callback': <staticmethod(<function ProcessWrapper.print_callback>)>, 'enable_logging': <function ProcessWrapper.enable_logging>, 'disable_logging': <function ProcessWrapper.disable_logging>, 'enable_print': <function ProcessWrapper.enable_print>, 'disable_print': <function ProcessWrapper.disable_print>, '__static_attributes__': (), '__dict__': <attribute '__dict__' of 'ProcessWrapper' objects>, '__weakref__': <attribute '__weakref__' of 'ProcessWrapper' objects>, '__doc__': None, '__attrs_attrs__': (Attribute(name='callbacks', default=Factory(factory=<class 'list'>, takes_self=False), validator=None, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='callbacks'),), '__ne__': <function __ne__>, '__lt__': <function ProcessWrapper.__lt__>, '__le__': <function ProcessWrapper.__le__>, '__gt__': <function ProcessWrapper.__gt__>, '__ge__': <function ProcessWrapper.__ge__>, '__hash__': None, '__replace__': <function ProcessWrapper.<lambda>>, '__match_args__': ('callbacks',), '__repr__': <function ProcessWrapper.__repr__>, '__eq__': <function ProcessWrapper.__eq__>, '__init__': <function ProcessWrapper.__init__>, '__annotations__': {}})
- __eq__(other)
Method generated by attrs for class ProcessWrapper.
- __firstlineno__ = 37
- __ge__(other)
Method generated by attrs for class ProcessWrapper.
- __gt__(other)
Method generated by attrs for class ProcessWrapper.
- __hash__ = None
- __init__(callbacks=NOTHING) None
Method generated by attrs for class ProcessWrapper.
- __le__(other)
Method generated by attrs for class ProcessWrapper.
- __lt__(other)
Method generated by attrs for class ProcessWrapper.
- __match_args__ = ('callbacks',)
- __module__ = 'labgrid.util.helper'
- __ne__(other)
Check equality and either forward a NotImplemented or return the result negated.
- __replace__(**changes)
Method generated by attrs for class ProcessWrapper.
- __repr__()
Method generated by attrs for class ProcessWrapper.
- __static_attributes__ = ()
- __weakref__
list of weak references to the object
labgrid.util.managedfile module
- exception labgrid.util.managedfile.ManagedFileError[source]
Bases:
Exception
- __firstlineno__ = 15
- __module__ = 'labgrid.util.managedfile'
- __static_attributes__ = ()
- __weakref__
list of weak references to the object
- class labgrid.util.managedfile.ManagedFile(local_path, resource, detect_nfs=True)[source]
Bases:
object
The ManagedFile allows the synchronisation of a file to a remote host. It has to be created with the to be synced file and the target resource as argument:
- ::
from labgrid.util.managedfile import ManagedFile
ManagedFile(“/tmp/examplefile”, <your-resource>)
Synchronisation is done with the sync_to_resource method.
- sync_to_resource(symlink=None)[source]
sync the file to the host specified in a resource
- Raises:
ExecutionError – if the SSH connection/copy fails
- get_remote_path()[source]
Retrieve the remote file path
- Returns:
path to the file on the remote host
- Return type:
str
- get_hash()[source]
Retrieve the hash of the file
- Returns:
SHA256 hexdigest of the file
- Return type:
str
- __attrs_attrs__ = (Attribute(name='local_path', default=NOTHING, validator=<instance_of validator for type <class 'str'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=<function ManagedFile.<lambda>>, kw_only=False, inherited=False, on_setattr=None, alias='local_path'), Attribute(name='resource', default=NOTHING, validator=<instance_of validator for type <class 'labgrid.resource.common.Resource'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='resource'), Attribute(name='detect_nfs', default=True, validator=<instance_of validator for type <class 'bool'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='detect_nfs'))
- __dict__ = mappingproxy({'__module__': 'labgrid.util.managedfile', '__firstlineno__': 19, '__doc__': 'The ManagedFile allows the synchronisation of a file to a remote host.\nIt has to be created with the to be synced file and the target resource as\nargument:\n\n::\n from labgrid.util.managedfile import ManagedFile\n\n ManagedFile("/tmp/examplefile", <your-resource>)\n\nSynchronisation is done with the sync_to_resource method.\n', '__attrs_post_init__': <function ManagedFile.__attrs_post_init__>, 'sync_to_resource': <function ManagedFile.sync_to_resource>, '_on_nfs': <function ManagedFile._on_nfs>, 'get_remote_path': <function ManagedFile.get_remote_path>, 'get_hash': <function ManagedFile.get_hash>, 'get_user_cache_path': <function ManagedFile.get_user_cache_path>, '__static_attributes__': ('_on_nfs_cached', 'hash', 'logger', 'rpath'), '__dict__': <attribute '__dict__' of 'ManagedFile' objects>, '__weakref__': <attribute '__weakref__' of 'ManagedFile' objects>, '__attrs_attrs__': (Attribute(name='local_path', default=NOTHING, validator=<instance_of validator for type <class 'str'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=<function ManagedFile.<lambda>>, kw_only=False, inherited=False, on_setattr=None, alias='local_path'), Attribute(name='resource', default=NOTHING, validator=<instance_of validator for type <class 'labgrid.resource.common.Resource'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='resource'), Attribute(name='detect_nfs', default=True, validator=<instance_of validator for type <class 'bool'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='detect_nfs')), '__ne__': <function __ne__>, '__lt__': <function ManagedFile.__lt__>, '__le__': <function ManagedFile.__le__>, '__gt__': <function ManagedFile.__gt__>, '__ge__': <function ManagedFile.__ge__>, '__hash__': None, '__replace__': <function ManagedFile.<lambda>>, '__match_args__': ('local_path', 'resource', 'detect_nfs'), '__repr__': <function ManagedFile.__repr__>, '__eq__': <function ManagedFile.__eq__>, '__init__': <function ManagedFile.__init__>, '__annotations__': {}})
- __eq__(other)
Method generated by attrs for class ManagedFile.
- __firstlineno__ = 19
- __ge__(other)
Method generated by attrs for class ManagedFile.
- __gt__(other)
Method generated by attrs for class ManagedFile.
- __hash__ = None
- __init__(local_path, resource, detect_nfs=True) None
Method generated by attrs for class ManagedFile.
- __le__(other)
Method generated by attrs for class ManagedFile.
- __lt__(other)
Method generated by attrs for class ManagedFile.
- __match_args__ = ('local_path', 'resource', 'detect_nfs')
- __module__ = 'labgrid.util.managedfile'
- __ne__(other)
Check equality and either forward a NotImplemented or return the result negated.
- __replace__(**changes)
Method generated by attrs for class ManagedFile.
- __repr__()
Method generated by attrs for class ManagedFile.
- __static_attributes__ = ('_on_nfs_cached', 'hash', 'logger', 'rpath')
- __weakref__
list of weak references to the object
labgrid.util.marker module
labgrid.util.proxy module
labgrid.util.qmp module
- class labgrid.util.qmp.QMPMonitor(monitor_out, monitor_in)[source]
Bases:
object
- __attrs_attrs__ = (Attribute(name='monitor_out', default=NOTHING, validator=None, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='monitor_out'), Attribute(name='monitor_in', default=NOTHING, validator=None, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='monitor_in'))
- __dict__ = mappingproxy({'__module__': 'labgrid.util.qmp', '__firstlineno__': 6, '__attrs_post_init__': <function QMPMonitor.__attrs_post_init__>, '_negotiate_capabilities': <function QMPMonitor._negotiate_capabilities>, '_read_parse_json': <function QMPMonitor._read_parse_json>, 'execute': <function QMPMonitor.execute>, '__static_attributes__': ('logger',), '__dict__': <attribute '__dict__' of 'QMPMonitor' objects>, '__weakref__': <attribute '__weakref__' of 'QMPMonitor' objects>, '__doc__': None, '__attrs_attrs__': (Attribute(name='monitor_out', default=NOTHING, validator=None, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='monitor_out'), Attribute(name='monitor_in', default=NOTHING, validator=None, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='monitor_in')), '__replace__': <function QMPMonitor.<lambda>>, '__match_args__': ('monitor_out', 'monitor_in'), '__repr__': <function QMPMonitor.__repr__>, '__init__': <function QMPMonitor.__init__>, '__annotations__': {}})
- __firstlineno__ = 6
- __init__(monitor_out, monitor_in) None
Method generated by attrs for class QMPMonitor.
- __match_args__ = ('monitor_out', 'monitor_in')
- __module__ = 'labgrid.util.qmp'
- __replace__(**changes)
Method generated by attrs for class QMPMonitor.
- __repr__()
Method generated by attrs for class QMPMonitor.
- __static_attributes__ = ('logger',)
- __weakref__
list of weak references to the object
- exception labgrid.util.qmp.QMPError(msg)[source]
Bases:
Exception
- __attrs_attrs__ = (Attribute(name='msg', default=NOTHING, validator=<instance_of validator for type <class 'str'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='msg'),)
- __firstlineno__ = 50
- __init__(msg) None
Method generated by attrs for class QMPError.
- __match_args__ = ('msg',)
- __module__ = 'labgrid.util.qmp'
- __replace__(**changes)
Method generated by attrs for class QMPError.
- __repr__()
Method generated by attrs for class QMPError.
- __static_attributes__ = ()
- __weakref__
list of weak references to the object
labgrid.util.snmp module
labgrid.util.ssh module
- class labgrid.util.ssh.SSHConnection(host)[source]
Bases:
object
SSHConnections are individual connections to hosts managed by a control socket. In addition to command execution this class also provides an interface to manage port forwardings. These are used in the remote infrastructure to tunnel multiple connections over one SSH link.
A public identity infrastructure is assumed, no extra username or passwords are supported.
- run(command, *, codec='utf-8', decodeerrors='strict', force_tty=False, stderr_merge=False, stderr_loglevel=None, stdout_loglevel=None)[source]
Run a command over the SSHConnection
- Parameters:
command (string) – The command to run
codec (string, optional) – output encoding. Defaults to “utf-8”.
decodeerrors (string, optional) – behavior on decode errors. Defaults to “strict”. Refer to stdtypes’ bytes.decode for details.
force_tty (bool, optional) – force allocate a tty (ssh -tt). Defaults to False
stderr_merge (bool, optional) – merge ssh subprocess stderr into stdout. Defaults to False.
stdout_loglevel (int, optional) – log stdout with specific log level as well. Defaults to None, i.e. don’t log.
stderr_loglevel (int, optional) – log stderr with specific log level as well. Defaults to None, i.e. don’t log.
- Returns:
(stdout, stderr, returncode)
- run_check(command, *, codec='utf-8', decodeerrors='strict', force_tty=False, stderr_merge=False, stderr_loglevel=None, stdout_loglevel=None)[source]
Runs a command over the SSHConnection returns the output if successful, raises ExecutionError otherwise.
Except for the means of returning the value, this is equivalent to run.
- Parameters:
command (string) – The command to run
codec (string, optional) – output encoding. Defaults to “utf-8”.
decodeerrors (string, optional) – behavior on decode errors. Defaults to “strict”. Refer to stdtypes’ bytes.decode for details.
force_tty (bool, optional) – force allocate a tty (ssh -tt). Defaults to False
stderr_merge (bool, optional) – merge ssh subprocess stderr into stdout. Defaults to False.
stdout_loglevel (int, optional) – log stdout with specific log level as well. Defaults to None, i.e. don’t log.
stderr_loglevel (int, optional) – log stderr with specific log level as well. Defaults to None, i.e. don’t log.
- Returns:
- stdout of the executed command if successful and
otherwise an ExecutionError Exception
- Return type:
List[str]
- add_remote_port_forward(remote_port, local_port, remote_bind=None)[source]
remote forward command
Note that the remote socket is not bound to any specific IP by default, making it reachable by the target. Also, ‘GatewayPorts clientspecified’ needs to be configured in the remote host’s sshd_config.
- remove_remote_port_forward(remote_port, local_port, remote_bind=None)[source]
remote cancel command
- __attrs_attrs__ = (Attribute(name='host', default=NOTHING, validator=<instance_of validator for type <class 'str'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='host'), Attribute(name='_connected', default=False, validator=<instance_of validator for type <class 'bool'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=False, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='connected'), Attribute(name='_tmpdir', default=Factory(factory=<function SSHConnection.<lambda>>, takes_self=False), validator=<instance_of validator for type <class 'str'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=False, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='tmpdir'), Attribute(name='_l_forwards', default=Factory(factory=<class 'dict'>, takes_self=False), validator=None, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=False, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='l_forwards'), Attribute(name='_r_forwards', default=Factory(factory=<class 'set'>, takes_self=False), validator=None, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=False, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='r_forwards'))
- __dict__ = mappingproxy({'__module__': 'labgrid.util.ssh', '__firstlineno__': 123, '__doc__': 'SSHConnections are individual connections to hosts managed by a control\nsocket. In addition to command execution this class also provides an\ninterface to manage port forwardings. These are used in the remote\ninfrastructure to tunnel multiple connections over one SSH link.\n\nA public identity infrastructure is assumed, no extra username or passwords\nare supported.', '__attrs_post_init__': <function SSHConnection.__attrs_post_init__>, '_get_ssh_base_args': <staticmethod(<function SSHConnection._get_ssh_base_args>)>, '_get_ssh_control_args': <function SSHConnection._get_ssh_control_args>, '_get_ssh_args': <function SSHConnection._get_ssh_args>, '_open_connection': <function SSHConnection._open_connection>, '_run_socket_command': <function SSHConnection._run_socket_command>, 'get_prefix': <function SSHConnection.get_prefix>, 'run': <function SSHConnection.run>, 'run_check': <function SSHConnection.run_check>, 'get_file': <function SSHConnection.get_file>, 'put_file': <function SSHConnection.put_file>, 'add_port_forward': <function SSHConnection.add_port_forward>, 'remove_port_forward': <function SSHConnection.remove_port_forward>, 'add_remote_port_forward': <function SSHConnection.add_remote_port_forward>, 'remove_remote_port_forward': <function SSHConnection.remove_remote_port_forward>, 'connect': <function SSHConnection.connect>, 'isconnected': <function SSHConnection.isconnected>, '_check_external_master': <function SSHConnection._check_external_master>, '_start_own_master': <function SSHConnection._start_own_master>, '_stop_own_master': <function SSHConnection._stop_own_master>, '_start_keepalive': <function SSHConnection._start_keepalive>, '_check_keepalive': <function SSHConnection._check_keepalive>, '_stop_keepalive': <function SSHConnection._stop_keepalive>, 'disconnect': <function SSHConnection.disconnect>, 'cleanup': <function SSHConnection.cleanup>, '__static_attributes__': ('_connected', '_keepalive', '_logger', '_master', '_socket'), '__dict__': <attribute '__dict__' of 'SSHConnection' objects>, '__weakref__': <attribute '__weakref__' of 'SSHConnection' objects>, '__attrs_attrs__': (Attribute(name='host', default=NOTHING, validator=<instance_of validator for type <class 'str'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='host'), Attribute(name='_connected', default=False, validator=<instance_of validator for type <class 'bool'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=False, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='connected'), Attribute(name='_tmpdir', default=Factory(factory=<function SSHConnection.<lambda>>, takes_self=False), validator=<instance_of validator for type <class 'str'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=False, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='tmpdir'), Attribute(name='_l_forwards', default=Factory(factory=<class 'dict'>, takes_self=False), validator=None, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=False, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='l_forwards'), Attribute(name='_r_forwards', default=Factory(factory=<class 'set'>, takes_self=False), validator=None, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=False, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='r_forwards')), '__ne__': <function __ne__>, '__lt__': <function SSHConnection.__lt__>, '__le__': <function SSHConnection.__le__>, '__gt__': <function SSHConnection.__gt__>, '__ge__': <function SSHConnection.__ge__>, '__hash__': None, '__replace__': <function SSHConnection.<lambda>>, '__match_args__': ('host',), '__repr__': <function SSHConnection.__repr__>, '__eq__': <function SSHConnection.__eq__>, '__init__': <function SSHConnection.__init__>, '__annotations__': {}})
- __eq__(other)
Method generated by attrs for class SSHConnection.
- __firstlineno__ = 123
- __ge__(other)
Method generated by attrs for class SSHConnection.
- __gt__(other)
Method generated by attrs for class SSHConnection.
- __hash__ = None
- __init__(host) None
Method generated by attrs for class SSHConnection.
- __le__(other)
Method generated by attrs for class SSHConnection.
- __lt__(other)
Method generated by attrs for class SSHConnection.
- __match_args__ = ('host',)
- __module__ = 'labgrid.util.ssh'
- __ne__(other)
Check equality and either forward a NotImplemented or return the result negated.
- __replace__(**changes)
Method generated by attrs for class SSHConnection.
- __repr__()
Method generated by attrs for class SSHConnection.
- __static_attributes__ = ('_connected', '_keepalive', '_logger', '_master', '_socket')
- __weakref__
list of weak references to the object
- exception labgrid.util.ssh.ForwardError(msg)[source]
Bases:
Exception
- __attrs_attrs__ = (Attribute(name='msg', default=NOTHING, validator=<instance_of validator for type <class 'str'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='msg'),)
- __eq__(other)
Method generated by attrs for class ForwardError.
- __firstlineno__ = 547
- __ge__(other)
Method generated by attrs for class ForwardError.
- __gt__(other)
Method generated by attrs for class ForwardError.
- __hash__ = None
- __init__(msg) None
Method generated by attrs for class ForwardError.
- __le__(other)
Method generated by attrs for class ForwardError.
- __lt__(other)
Method generated by attrs for class ForwardError.
- __match_args__ = ('msg',)
- __module__ = 'labgrid.util.ssh'
- __ne__(other)
Check equality and either forward a NotImplemented or return the result negated.
- __replace__(**changes)
Method generated by attrs for class ForwardError.
- __repr__()
Method generated by attrs for class ForwardError.
- __static_attributes__ = ()
- __weakref__
list of weak references to the object
labgrid.util.timeout module
- class labgrid.util.timeout.Timeout(timeout=120.0)[source]
Bases:
object
Reperents a timeout (as a deadline)
- property remaining
- property expired
- __attrs_attrs__ = (Attribute(name='timeout', default=120.0, validator=<instance_of validator for type <class 'float'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='timeout'),)
- __dict__ = mappingproxy({'__module__': 'labgrid.util.timeout', '__firstlineno__': 6, '__doc__': 'Reperents a timeout (as a deadline)', '__attrs_post_init__': <function Timeout.__attrs_post_init__>, 'remaining': <property object>, 'expired': <property object>, '__static_attributes__': ('_deadline',), '__dict__': <attribute '__dict__' of 'Timeout' objects>, '__weakref__': <attribute '__weakref__' of 'Timeout' objects>, '__attrs_attrs__': (Attribute(name='timeout', default=120.0, validator=<instance_of validator for type <class 'float'>>, repr=True, eq=True, eq_key=None, order=True, order_key=None, hash=None, init=True, metadata=mappingproxy({}), type=None, converter=None, kw_only=False, inherited=False, on_setattr=None, alias='timeout'),), '__replace__': <function Timeout.<lambda>>, '__match_args__': ('timeout',), '__repr__': <function Timeout.__repr__>, '__init__': <function Timeout.__init__>, '__annotations__': {}})
- __firstlineno__ = 6
- __init__(timeout=120.0) None
Method generated by attrs for class Timeout.
- __match_args__ = ('timeout',)
- __module__ = 'labgrid.util.timeout'
- __replace__(**changes)
Method generated by attrs for class Timeout.
- __repr__()
Method generated by attrs for class Timeout.
- __static_attributes__ = ('_deadline',)
- __weakref__
list of weak references to the object
labgrid.util.version module
This module contains helper functions for working with version.
labgrid.util.yaml module
This module contains the custom YAML load and dump functions and associated loader and dumper
- class labgrid.util.yaml.Loader(stream)[source]
Bases:
SafeLoader
- __firstlineno__ = 13
- __module__ = 'labgrid.util.yaml'
- __static_attributes__ = ()
- yaml_constructors = {'!template': <function _template_constructor>, 'tag:yaml.org,2002:binary': <function SafeConstructor.construct_yaml_binary>, 'tag:yaml.org,2002:bool': <function SafeConstructor.construct_yaml_bool>, 'tag:yaml.org,2002:float': <function SafeConstructor.construct_yaml_float>, 'tag:yaml.org,2002:int': <function SafeConstructor.construct_yaml_int>, 'tag:yaml.org,2002:map': <function _dict_constructor>, 'tag:yaml.org,2002:null': <function SafeConstructor.construct_yaml_null>, 'tag:yaml.org,2002:omap': <function SafeConstructor.construct_yaml_omap>, 'tag:yaml.org,2002:pairs': <function SafeConstructor.construct_yaml_pairs>, 'tag:yaml.org,2002:python/tuple': <function FullConstructor.construct_python_tuple>, 'tag:yaml.org,2002:seq': <function SafeConstructor.construct_yaml_seq>, 'tag:yaml.org,2002:set': <function SafeConstructor.construct_yaml_set>, 'tag:yaml.org,2002:str': <function _str_constructor>, 'tag:yaml.org,2002:timestamp': <function SafeConstructor.construct_yaml_timestamp>, None: <function SafeConstructor.construct_undefined>}
- class labgrid.util.yaml.Dumper(stream, default_style=None, default_flow_style=False, canonical=None, indent=None, width=None, allow_unicode=None, line_break=None, encoding=None, explicit_start=None, explicit_end=None, version=None, tags=None, sort_keys=True)[source]
Bases:
SafeDumper
- __annotations__ = {}
- __firstlineno__ = 17
- __module__ = 'labgrid.util.yaml'
- __static_attributes__ = ()
- yaml_representers = {<class 'NoneType'>: <function SafeRepresenter.represent_none>, <class 'bool'>: <function SafeRepresenter.represent_bool>, <class 'bytes'>: <function SafeRepresenter.represent_binary>, <class 'collections.OrderedDict'>: <function _dict_representer>, <class 'datetime.date'>: <function SafeRepresenter.represent_date>, <class 'datetime.datetime'>: <function SafeRepresenter.represent_datetime>, <class 'dict'>: <function SafeRepresenter.represent_dict>, <class 'float'>: <function SafeRepresenter.represent_float>, <class 'int'>: <function SafeRepresenter.represent_int>, <class 'list'>: <function SafeRepresenter.represent_list>, <class 'set'>: <function SafeRepresenter.represent_set>, <class 'str'>: <function SafeRepresenter.represent_str>, <class 'tuple'>: <function SafeRepresenter.represent_list>, None: <function SafeRepresenter.represent_undefined>}