Source code for labgrid.remote.common

# pylint: disable=unsubscriptable-object
import socket
import time
import enum
import random
import re
import string
from datetime import datetime
from fnmatch import fnmatchcase

import attr

TAG_KEY = re.compile(r"[a-z][a-z0-9_]+")
TAG_VAL = re.compile(r"[a-z0-9_]?")

[docs]@attr.s(eq=False) class ResourceEntry: data = attr.ib() # cls, params
[docs] def __attrs_post_init__(self):'acquired', None)'avail', False)
@property def acquired(self): return['acquired'] @property def avail(self): return['avail'] @property def cls(self): return['cls'] @property def params(self): return['params'] @property def args(self): """arguments for resource construction""" args =['params'].copy() args.pop('extra', None) return args @property def extra(self): """extra resource information""" return['params'].get('extra', {})
[docs] def asdict(self): return { 'cls': self.cls, 'params': self.params, 'acquired': self.acquired, 'avail': self.avail, }
[docs] def update(self, data): """apply updated information from the exporter on the coordinator""" data = data.copy() data.setdefault('acquired', None) data.setdefault('avail', False) = data
[docs] def acquire(self, place_name): assert['acquired'] is None['acquired'] = place_name
[docs] def release(self): # ignore repeated releases['acquired'] = None
[docs]@attr.s(eq=True, repr=False, str=False) # This class requires eq=True, since we put the matches into a list and require # the cmp functions to be able to remove the matches from the list later on. class ResourceMatch: exporter = attr.ib() group = attr.ib() cls = attr.ib() name = attr.ib(default=None) # rename is just metadata, so don't use it for comparing matches rename = attr.ib(default=None, eq=False)
[docs] @classmethod def fromstr(cls, pattern): if not 2 <= pattern.count("/") <= 3: raise ValueError( "invalid pattern format '{}' (use 'exporter/group/cls/name')". format(pattern) ) return cls(*pattern.split("/"))
[docs] def __repr__(self): result = "{}/{}/{}".format(self.exporter,, self.cls) if is not None: result += "/{}".format( return result
[docs] def __str__(self): result = repr(self) if self.rename: result += " -> " + self.rename return result
[docs] def ismatch(self, resource_path): """Return True if this matches the given resource""" exporter, group, cls, name = resource_path if not fnmatchcase(exporter, self.exporter): return False if not fnmatchcase(group, return False if not fnmatchcase(cls, self.cls): return False if and not fnmatchcase(name, return False return True
[docs]@attr.s(eq=False) class Place: name = attr.ib() aliases = attr.ib(default=attr.Factory(set), converter=set) comment = attr.ib(default="") tags = attr.ib(default=attr.Factory(dict)) matches = attr.ib(default=attr.Factory(list)) acquired = attr.ib(default=None) acquired_resources = attr.ib(default=attr.Factory(list)) allowed = attr.ib(default=attr.Factory(set), converter=set) created = attr.ib(default=attr.Factory(time.time)) changed = attr.ib(default=attr.Factory(time.time)) reservation = attr.ib(default=None)
[docs] def asdict(self): # in the coordinator, we have resource objects, otherwise just a path acquired_resources = [] for resource in self.acquired_resources: # pylint: disable=not-an-iterable if isinstance(resource, (tuple, list)): acquired_resources.append(resource) else: acquired_resources.append(resource.path) return { 'aliases': list(self.aliases), 'comment': self.comment, 'tags': self.tags, 'matches': [attr.asdict(x) for x in self.matches], 'acquired': self.acquired, 'acquired_resources': acquired_resources, 'allowed': list(self.allowed), 'created': self.created, 'changed': self.changed, 'reservation': self.reservation, }
[docs] def update(self, config): fields = attr.fields_dict(type(self)) for k, v in config.items(): assert k in fields if k == 'name': # we cannot rename places assert v == continue setattr(self, k, v)
[docs] def show(self, level=0): indent = ' ' * level if self.aliases: print(indent + "aliases: {}".format(', '.join(self.aliases))) if self.comment: print(indent + "comment: {}".format(self.comment)) if self.tags: print(indent + "tags: {}".format( ', '.join(k+"="+v for k, v in sorted(self.tags.items())) )) print(indent + "matches:") for match in self.matches: # pylint: disable=not-an-iterable print(indent + " {}".format(match)) print(indent + "acquired: {}".format(self.acquired)) print(indent + "acquired resources:") # in the coordinator, we have resource objects, otherwise just a path for resource in self.acquired_resources: # pylint: disable=not-an-iterable if isinstance(resource, (tuple, list)): resource_path = resource else: resource_path = resource.path match = self.getmatch(resource_path) if match.rename: print(indent + " {} -> {}".format( '/'.join(resource_path), match.rename)) else: print(indent + " {}".format( '/'.join(resource_path))) if self.allowed: print(indent + "allowed: {}".format(', '.join(self.allowed))) print(indent + "created: {}".format(datetime.fromtimestamp(self.created))) print(indent + "changed: {}".format(datetime.fromtimestamp(self.changed))) if self.reservation: print(indent + "reservation: {}".format(self.reservation))
[docs] def getmatch(self, resource_path): """Return the ResourceMatch object for the given resource path or None if not found. A resource_path has the structure (exporter, group, cls, name). """ for match in self.matches: # pylint: disable=not-an-iterable if match.ismatch(resource_path): return match return None
[docs] def hasmatch(self, resource_path): """Return True if this place as a ResourceMatch object for the given resource path. A resource_path has the structure (exporter, group, cls, name). """ return self.getmatch(resource_path) is not None
[docs] def touch(self): self.changed = time.time()
[docs]class ReservationState(enum.Enum): waiting = 0 allocated = 1 acquired = 2 expired = 3 invalid = 4
[docs]@attr.s(eq=False) class Reservation: owner = attr.ib(validator=attr.validators.instance_of(str)) token = attr.ib(default=attr.Factory( lambda: ''.join(random.choice(string.ascii_uppercase+string.digits) for i in range(10)))) state = attr.ib( default='waiting', converter=lambda x: x if isinstance(x, ReservationState) else ReservationState[x], validator=attr.validators.instance_of(ReservationState)) prio = attr.ib(default=0.0, validator=attr.validators.instance_of(float)) # a dictionary of name -> filter dicts filters = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict)) # a dictionary of name -> place names allocations = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict)) created = attr.ib(default=attr.Factory(time.time)) timeout = attr.ib(default=attr.Factory(lambda: time.time() + 60))
[docs] def asdict(self): return { 'owner': self.owner, 'state':, 'prio': self.prio, 'filters': self.filters, 'allocations': self.allocations, 'created': self.created, 'timeout': self.timeout, }
[docs] def refresh(self, delta=60): self.timeout = max(self.timeout, time.time() + delta)
@property def expired(self): return self.timeout < time.time()
[docs] def show(self, level=0): indent = ' ' * level print(indent + "owner: {}".format(self.owner)) print(indent + "token: {}".format(self.token)) print(indent + "state: {}".format( if self.prio: print(indent + "prio: {}".format(self.prio)) print(indent + "filters:") for name, filter in self.filters.items(): print(indent + " {}: {}".format(name, " ".join([k+"="+v for k, v in filter.items()]))) if self.allocations: print(indent + "allocations:") for name, allocation in self.allocations.items(): print(indent + " {}: {}".format(name, ", ".join(allocation))) print(indent + "created: {}".format(datetime.fromtimestamp(self.created))) print(indent + "timeout: {}".format(datetime.fromtimestamp(self.timeout)))
[docs]def enable_tcp_nodelay(session): """ asyncio/autobahn does not set TCP_NODELAY by default, so we need to do it like this for now. """ s = session._transport.transport.get_extra_info('socket') s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)