Source code for labgrid.remote.common

import socket
import time
import enum
import random
import re
import string
from datetime import datetime
from fnmatch import fnmatchcase

import attr

__all__ = [
    'TAG_KEY',
    'TAG_VAL',
    'ResourceEntry',
    'ResourceMatch',
    'Place',
    'ReservationState',
    'Reservation',
    'enable_tcp_nodelay',
    'monkey_patch_max_msg_payload_size_ws_option',
]

TAG_KEY = re.compile(r"[a-z][a-z0-9_]+")
TAG_VAL = re.compile(r"[a-z0-9_]?")


[docs] @attr.s(eq=False) class ResourceEntry: data = attr.ib() # cls, params
[docs] def __attrs_post_init__(self): self.data.setdefault('acquired', None) self.data.setdefault('avail', False)
@property def acquired(self): return self.data['acquired'] @property def avail(self): return self.data['avail'] @property def cls(self): return self.data['cls'] @property def params(self): return self.data['params'] @property def args(self): """arguments for resource construction""" args = self.data['params'].copy() args.pop('extra', None) return args @property def extra(self): """extra resource information""" return self.data['params'].get('extra', {})
[docs] def asdict(self): return { 'cls': self.cls, 'params': self.params, 'acquired': self.acquired, 'avail': self.avail, }
[docs] def update(self, data): """apply updated information from the exporter on the coordinator""" data = data.copy() data.setdefault('acquired', None) data.setdefault('avail', False) self.data = data
[docs] def acquire(self, place_name): assert self.data['acquired'] is None self.data['acquired'] = place_name
[docs] def release(self): # ignore repeated releases self.data['acquired'] = None
[docs] @attr.s(eq=True, repr=False, str=False) # This class requires eq=True, since we put the matches into a list and require # the cmp functions to be able to remove the matches from the list later on. class ResourceMatch: exporter = attr.ib() group = attr.ib() cls = attr.ib() name = attr.ib(default=None) # rename is just metadata, so don't use it for comparing matches rename = attr.ib(default=None, eq=False)
[docs] @classmethod def fromstr(cls, pattern): if not 2 <= pattern.count("/") <= 3: raise ValueError( f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')" ) return cls(*pattern.split("/"))
[docs] def __repr__(self): result = f"{self.exporter}/{self.group}/{self.cls}" if self.name is not None: result += f"/{self.name}" return result
[docs] def __str__(self): result = repr(self) if self.rename: result += " -> " + self.rename return result
[docs] def ismatch(self, resource_path): """Return True if this matches the given resource""" try: exporter, group, cls, name = resource_path except ValueError: exporter, group, cls = resource_path name = None if not fnmatchcase(exporter, self.exporter): return False if not fnmatchcase(group, self.group): return False if not fnmatchcase(cls, self.cls): return False if name and self.name and not fnmatchcase(name, self.name): return False return True
[docs] @attr.s(eq=False) class Place: name = attr.ib() aliases = attr.ib(default=attr.Factory(set), converter=set) comment = attr.ib(default="") tags = attr.ib(default=attr.Factory(dict)) matches = attr.ib(default=attr.Factory(list)) acquired = attr.ib(default=None) acquired_resources = attr.ib(default=attr.Factory(list)) allowed = attr.ib(default=attr.Factory(set), converter=set) created = attr.ib(default=attr.Factory(time.time)) changed = attr.ib(default=attr.Factory(time.time)) reservation = attr.ib(default=None)
[docs] def asdict(self): # in the coordinator, we have resource objects, otherwise just a path acquired_resources = [] for resource in self.acquired_resources: if isinstance(resource, (tuple, list)): acquired_resources.append(resource) else: acquired_resources.append(resource.path) return { 'aliases': list(self.aliases), 'comment': self.comment, 'tags': self.tags, 'matches': [attr.asdict(x) for x in self.matches], 'acquired': self.acquired, 'acquired_resources': acquired_resources, 'allowed': list(self.allowed), 'created': self.created, 'changed': self.changed, 'reservation': self.reservation, }
[docs] def update(self, config): fields = attr.fields_dict(type(self)) for k, v in config.items(): assert k in fields if k == 'name': # we cannot rename places assert v == self.name continue setattr(self, k, v)
[docs] def show(self, level=0): indent = ' ' * level if self.aliases: print(indent + f"aliases: {', '.join(sorted(self.aliases))}") if self.comment: print(indent + f"comment: {self.comment}") if self.tags: print(indent + f"tags: {', '.join(k + '=' + v for k, v in sorted(self.tags.items()))}") print(indent + "matches:") for match in sorted(self.matches): print(indent + f" {match}") print(indent + f"acquired: {self.acquired}") print(indent + "acquired resources:") # in the coordinator, we have resource objects, otherwise just a path for resource in sorted(self.acquired_resources): if isinstance(resource, (tuple, list)): resource_path = resource else: resource_path = resource.path match = self.getmatch(resource_path) if match.rename: print(indent + f" {'/'.join(resource_path)} -> {match.rename}") else: print(indent + f" {'/'.join(resource_path)}") if self.allowed: print(indent + f"allowed: {', '.join(self.allowed)}") print(indent + f"created: {datetime.fromtimestamp(self.created)}") print(indent + f"changed: {datetime.fromtimestamp(self.changed)}") if self.reservation: print(indent + f"reservation: {self.reservation}")
[docs] def getmatch(self, resource_path): """Return the ResourceMatch object for the given resource path or None if not found. A resource_path has the structure (exporter, group, cls, name). """ for match in self.matches: if match.ismatch(resource_path): return match return None
[docs] def hasmatch(self, resource_path): """Return True if this place as a ResourceMatch object for the given resource path. A resource_path has the structure (exporter, group, cls, name). """ return self.getmatch(resource_path) is not None
[docs] def unmatched(self, resource_paths): """Returns a match which could not be matched to the list of resource_path A resource_path has the structure (exporter, group, cls, name). """ for match in self.matches: if not any([match.ismatch(resource) for resource in resource_paths]): return match
[docs] def touch(self): self.changed = time.time()
[docs] class ReservationState(enum.Enum): waiting = 0 allocated = 1 acquired = 2 expired = 3 invalid = 4
[docs] @attr.s(eq=False) class Reservation: owner = attr.ib(validator=attr.validators.instance_of(str)) token = attr.ib(default=attr.Factory( lambda: ''.join(random.choice(string.ascii_uppercase+string.digits) for i in range(10)))) state = attr.ib( default='waiting', converter=lambda x: x if isinstance(x, ReservationState) else ReservationState[x], validator=attr.validators.instance_of(ReservationState)) prio = attr.ib(default=0.0, validator=attr.validators.instance_of(float)) # a dictionary of name -> filter dicts filters = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict)) # a dictionary of name -> place names allocations = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict)) created = attr.ib(default=attr.Factory(time.time)) timeout = attr.ib(default=attr.Factory(lambda: time.time() + 60))
[docs] def asdict(self): return { 'owner': self.owner, 'state': self.state.name, 'prio': self.prio, 'filters': self.filters, 'allocations': self.allocations, 'created': self.created, 'timeout': self.timeout, }
[docs] def refresh(self, delta=60): self.timeout = max(self.timeout, time.time() + delta)
@property def expired(self): return self.timeout < time.time()
[docs] def show(self, level=0): indent = ' ' * level print(indent + f"owner: {self.owner}") print(indent + f"token: {self.token}") print(indent + f"state: {self.state.name}") if self.prio: print(indent + f"prio: {self.prio}") print(indent + "filters:") for name, fltr in self.filters.items(): print(indent + f" {name}: {' '.join([(k + '=' + v) for k, v in fltr.items()])}") if self.allocations: print(indent + "allocations:") for name, allocation in self.allocations.items(): print(indent + f" {name}: {', '.join(allocation)}") print(indent + f"created: {datetime.fromtimestamp(self.created)}") print(indent + f"timeout: {datetime.fromtimestamp(self.timeout)}")
[docs] def enable_tcp_nodelay(session): """ asyncio/autobahn does not set TCP_NODELAY by default, so we need to do it like this for now. """ s = session._transport.transport.get_extra_info('socket') s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
[docs] def monkey_patch_max_msg_payload_size_ws_option(): """ The default maxMessagePayloadSize in autobahn is 1M. For larger setups with a big number of exported resources, this becomes the limiting factor. Increase maxMessagePayloadSize in WampWebSocketClientFactory.setProtocolOptions() by monkey patching it, so autobahn.asyncio.wamp.ApplicationRunner effectively sets the increased value. This function must be called before ApplicationRunner is instanciated. """ from autobahn.asyncio.websocket import WampWebSocketClientFactory original_method = WampWebSocketClientFactory.setProtocolOptions def set_protocol_options(*args, **kwargs): new_max_message_payload_size = 10485760 # maxMessagePayloadSize given as positional arg args = list(args) try: args[9] = max((args[9], new_max_message_payload_size)) except IndexError: pass # maxMessagePayloadSize given as kwarg kwarg_name = "maxMessagePayloadSize" if kwarg_name in kwargs and kwargs[kwarg_name] is not None: kwargs[kwarg_name] = max((kwargs[kwarg_name], new_max_message_payload_size)) return original_method(*args, **kwargs) WampWebSocketClientFactory.setProtocolOptions = set_protocol_options