# pylint: disable=unsubscriptable-object
import socket
import time
import enum
import random
import re
import string
from datetime import datetime
from fnmatch import fnmatchcase
import attr
TAG_KEY = re.compile(r"[a-z][a-z0-9_]+")
TAG_VAL = re.compile(r"[a-z0-9_]?")
[docs]@attr.s(eq=False)
class ResourceEntry:
data = attr.ib() # cls, params
[docs] def __attrs_post_init__(self):
self.data.setdefault('acquired', None)
self.data.setdefault('avail', False)
@property
def acquired(self):
return self.data['acquired']
@property
def avail(self):
return self.data['avail']
@property
def cls(self):
return self.data['cls']
@property
def params(self):
return self.data['params']
@property
def args(self):
"""arguments for resource construction"""
args = self.data['params'].copy()
args.pop('extra', None)
return args
@property
def extra(self):
"""extra resource information"""
return self.data['params'].get('extra', {})
[docs] def asdict(self):
return {
'cls': self.cls,
'params': self.params,
'acquired': self.acquired,
'avail': self.avail,
}
[docs] def update(self, data):
"""apply updated information from the exporter on the coordinator"""
data = data.copy()
data.setdefault('acquired', None)
data.setdefault('avail', False)
self.data = data
[docs] def acquire(self, place_name):
assert self.data['acquired'] is None
self.data['acquired'] = place_name
[docs] def release(self):
# ignore repeated releases
self.data['acquired'] = None
[docs]@attr.s(eq=True, repr=False, str=False)
# This class requires eq=True, since we put the matches into a list and require
# the cmp functions to be able to remove the matches from the list later on.
class ResourceMatch:
exporter = attr.ib()
group = attr.ib()
cls = attr.ib()
name = attr.ib(default=None)
# rename is just metadata, so don't use it for comparing matches
rename = attr.ib(default=None, eq=False)
[docs] @classmethod
def fromstr(cls, pattern):
if not 2 <= pattern.count("/") <= 3:
raise ValueError(
"invalid pattern format '{}' (use 'exporter/group/cls/name')".
format(pattern)
)
return cls(*pattern.split("/"))
[docs] def __repr__(self):
result = "{}/{}/{}".format(self.exporter, self.group, self.cls)
if self.name is not None:
result += "/{}".format(self.name)
return result
[docs] def __str__(self):
result = repr(self)
if self.rename:
result += " -> " + self.rename
return result
[docs] def ismatch(self, resource_path):
"""Return True if this matches the given resource"""
exporter, group, cls, name = resource_path
if not fnmatchcase(exporter, self.exporter):
return False
if not fnmatchcase(group, self.group):
return False
if not fnmatchcase(cls, self.cls):
return False
if self.name and not fnmatchcase(name, self.name):
return False
return True
[docs]@attr.s(eq=False)
class Place:
name = attr.ib()
aliases = attr.ib(default=attr.Factory(set), converter=set)
comment = attr.ib(default="")
tags = attr.ib(default=attr.Factory(dict))
matches = attr.ib(default=attr.Factory(list))
acquired = attr.ib(default=None)
acquired_resources = attr.ib(default=attr.Factory(list))
allowed = attr.ib(default=attr.Factory(set), converter=set)
created = attr.ib(default=attr.Factory(time.time))
changed = attr.ib(default=attr.Factory(time.time))
reservation = attr.ib(default=None)
[docs] def asdict(self):
# in the coordinator, we have resource objects, otherwise just a path
acquired_resources = []
for resource in self.acquired_resources: # pylint: disable=not-an-iterable
if isinstance(resource, (tuple, list)):
acquired_resources.append(resource)
else:
acquired_resources.append(resource.path)
return {
'aliases': list(self.aliases),
'comment': self.comment,
'tags': self.tags,
'matches': [attr.asdict(x) for x in self.matches],
'acquired': self.acquired,
'acquired_resources': acquired_resources,
'allowed': list(self.allowed),
'created': self.created,
'changed': self.changed,
'reservation': self.reservation,
}
[docs] def update(self, config):
fields = attr.fields_dict(type(self))
for k, v in config.items():
assert k in fields
if k == 'name':
# we cannot rename places
assert v == self.name
continue
setattr(self, k, v)
[docs] def show(self, level=0):
indent = ' ' * level
if self.aliases:
print(indent + "aliases: {}".format(', '.join(self.aliases)))
if self.comment:
print(indent + "comment: {}".format(self.comment))
if self.tags:
print(indent + "tags: {}".format(
', '.join(k+"="+v for k, v in sorted(self.tags.items()))
))
print(indent + "matches:")
for match in self.matches: # pylint: disable=not-an-iterable
print(indent + " {}".format(match))
print(indent + "acquired: {}".format(self.acquired))
print(indent + "acquired resources:")
# in the coordinator, we have resource objects, otherwise just a path
for resource in self.acquired_resources: # pylint: disable=not-an-iterable
if isinstance(resource, (tuple, list)):
resource_path = resource
else:
resource_path = resource.path
match = self.getmatch(resource_path)
if match.rename:
print(indent + " {} -> {}".format(
'/'.join(resource_path), match.rename))
else:
print(indent + " {}".format(
'/'.join(resource_path)))
if self.allowed:
print(indent + "allowed: {}".format(', '.join(self.allowed)))
print(indent + "created: {}".format(datetime.fromtimestamp(self.created)))
print(indent + "changed: {}".format(datetime.fromtimestamp(self.changed)))
if self.reservation:
print(indent + "reservation: {}".format(self.reservation))
[docs] def getmatch(self, resource_path):
"""Return the ResourceMatch object for the given resource path or None if not found.
A resource_path has the structure (exporter, group, cls, name).
"""
for match in self.matches: # pylint: disable=not-an-iterable
if match.ismatch(resource_path):
return match
return None
[docs] def hasmatch(self, resource_path):
"""Return True if this place as a ResourceMatch object for the given resource path.
A resource_path has the structure (exporter, group, cls, name).
"""
return self.getmatch(resource_path) is not None
[docs] def touch(self):
self.changed = time.time()
[docs]class ReservationState(enum.Enum):
waiting = 0
allocated = 1
acquired = 2
expired = 3
invalid = 4
[docs]@attr.s(eq=False)
class Reservation:
owner = attr.ib(validator=attr.validators.instance_of(str))
token = attr.ib(default=attr.Factory(
lambda: ''.join(random.choice(string.ascii_uppercase+string.digits) for i in range(10))))
state = attr.ib(
default='waiting',
converter=lambda x: x if isinstance(x, ReservationState) else ReservationState[x],
validator=attr.validators.instance_of(ReservationState))
prio = attr.ib(default=0.0, validator=attr.validators.instance_of(float))
# a dictionary of name -> filter dicts
filters = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict))
# a dictionary of name -> place names
allocations = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict))
created = attr.ib(default=attr.Factory(time.time))
timeout = attr.ib(default=attr.Factory(lambda: time.time() + 60))
[docs] def asdict(self):
return {
'owner': self.owner,
'state': self.state.name,
'prio': self.prio,
'filters': self.filters,
'allocations': self.allocations,
'created': self.created,
'timeout': self.timeout,
}
[docs] def refresh(self, delta=60):
self.timeout = max(self.timeout, time.time() + delta)
@property
def expired(self):
return self.timeout < time.time()
[docs] def show(self, level=0):
indent = ' ' * level
print(indent + "owner: {}".format(self.owner))
print(indent + "token: {}".format(self.token))
print(indent + "state: {}".format(self.state.name))
if self.prio:
print(indent + "prio: {}".format(self.prio))
print(indent + "filters:")
for name, filter in self.filters.items():
print(indent + " {}: {}".format(name, " ".join([k+"="+v for k, v in filter.items()])))
if self.allocations:
print(indent + "allocations:")
for name, allocation in self.allocations.items():
print(indent + " {}: {}".format(name, ", ".join(allocation)))
print(indent + "created: {}".format(datetime.fromtimestamp(self.created)))
print(indent + "timeout: {}".format(datetime.fromtimestamp(self.timeout)))
[docs]def enable_tcp_nodelay(session):
"""
asyncio/autobahn does not set TCP_NODELAY by default, so we need to do it
like this for now.
"""
s = session._transport.transport.get_extra_info('socket')
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)