import socket
import time
import enum
import random
import re
import string
from datetime import datetime
from fnmatch import fnmatchcase
import attr
__all__ = [
'TAG_KEY',
'TAG_VAL',
'ResourceEntry',
'ResourceMatch',
'Place',
'ReservationState',
'Reservation',
'enable_tcp_nodelay',
'monkey_patch_max_msg_payload_size_ws_option',
]
TAG_KEY = re.compile(r"[a-z][a-z0-9_]+")
TAG_VAL = re.compile(r"[a-z0-9_]?")
[docs]
@attr.s(eq=False)
class ResourceEntry:
data = attr.ib() # cls, params
[docs]
def __attrs_post_init__(self):
self.data.setdefault('acquired', None)
self.data.setdefault('avail', False)
@property
def acquired(self):
return self.data['acquired']
@property
def avail(self):
return self.data['avail']
@property
def cls(self):
return self.data['cls']
@property
def params(self):
return self.data['params']
@property
def args(self):
"""arguments for resource construction"""
args = self.data['params'].copy()
args.pop('extra', None)
return args
@property
def extra(self):
"""extra resource information"""
return self.data['params'].get('extra', {})
[docs]
def asdict(self):
return {
'cls': self.cls,
'params': self.params,
'acquired': self.acquired,
'avail': self.avail,
}
[docs]
def update(self, data):
"""apply updated information from the exporter on the coordinator"""
data = data.copy()
data.setdefault('acquired', None)
data.setdefault('avail', False)
self.data = data
[docs]
def acquire(self, place_name):
assert self.data['acquired'] is None
self.data['acquired'] = place_name
[docs]
def release(self):
# ignore repeated releases
self.data['acquired'] = None
[docs]
@attr.s(eq=True, repr=False, str=False)
# This class requires eq=True, since we put the matches into a list and require
# the cmp functions to be able to remove the matches from the list later on.
class ResourceMatch:
exporter = attr.ib()
group = attr.ib()
cls = attr.ib()
name = attr.ib(default=None)
# rename is just metadata, so don't use it for comparing matches
rename = attr.ib(default=None, eq=False)
[docs]
@classmethod
def fromstr(cls, pattern):
if not 2 <= pattern.count("/") <= 3:
raise ValueError(
f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')"
)
return cls(*pattern.split("/"))
[docs]
def __repr__(self):
result = f"{self.exporter}/{self.group}/{self.cls}"
if self.name is not None:
result += f"/{self.name}"
return result
[docs]
def __str__(self):
result = repr(self)
if self.rename:
result += " -> " + self.rename
return result
[docs]
def ismatch(self, resource_path):
"""Return True if this matches the given resource"""
try:
exporter, group, cls, name = resource_path
except ValueError:
exporter, group, cls = resource_path
name = None
if not fnmatchcase(exporter, self.exporter):
return False
if not fnmatchcase(group, self.group):
return False
if not fnmatchcase(cls, self.cls):
return False
if name and self.name and not fnmatchcase(name, self.name):
return False
return True
[docs]
@attr.s(eq=False)
class Place:
name = attr.ib()
aliases = attr.ib(default=attr.Factory(set), converter=set)
comment = attr.ib(default="")
tags = attr.ib(default=attr.Factory(dict))
matches = attr.ib(default=attr.Factory(list))
acquired = attr.ib(default=None)
acquired_resources = attr.ib(default=attr.Factory(list))
allowed = attr.ib(default=attr.Factory(set), converter=set)
created = attr.ib(default=attr.Factory(time.time))
changed = attr.ib(default=attr.Factory(time.time))
reservation = attr.ib(default=None)
[docs]
def asdict(self):
# in the coordinator, we have resource objects, otherwise just a path
acquired_resources = []
for resource in self.acquired_resources:
if isinstance(resource, (tuple, list)):
acquired_resources.append(resource)
else:
acquired_resources.append(resource.path)
return {
'aliases': list(self.aliases),
'comment': self.comment,
'tags': self.tags,
'matches': [attr.asdict(x) for x in self.matches],
'acquired': self.acquired,
'acquired_resources': acquired_resources,
'allowed': list(self.allowed),
'created': self.created,
'changed': self.changed,
'reservation': self.reservation,
}
[docs]
def update(self, config):
fields = attr.fields_dict(type(self))
for k, v in config.items():
assert k in fields
if k == 'name':
# we cannot rename places
assert v == self.name
continue
setattr(self, k, v)
[docs]
def show(self, level=0):
indent = ' ' * level
if self.aliases:
print(indent + f"aliases: {', '.join(sorted(self.aliases))}")
if self.comment:
print(indent + f"comment: {self.comment}")
if self.tags:
print(indent + f"tags: {', '.join(k + '=' + v for k, v in sorted(self.tags.items()))}")
print(indent + "matches:")
for match in sorted(self.matches):
print(indent + f" {match}")
print(indent + f"acquired: {self.acquired}")
print(indent + "acquired resources:")
# in the coordinator, we have resource objects, otherwise just a path
for resource in sorted(self.acquired_resources):
if isinstance(resource, (tuple, list)):
resource_path = resource
else:
resource_path = resource.path
match = self.getmatch(resource_path)
if match.rename:
print(indent + f" {'/'.join(resource_path)} -> {match.rename}")
else:
print(indent + f" {'/'.join(resource_path)}")
if self.allowed:
print(indent + f"allowed: {', '.join(self.allowed)}")
print(indent + f"created: {datetime.fromtimestamp(self.created)}")
print(indent + f"changed: {datetime.fromtimestamp(self.changed)}")
if self.reservation:
print(indent + f"reservation: {self.reservation}")
[docs]
def getmatch(self, resource_path):
"""Return the ResourceMatch object for the given resource path or None if not found.
A resource_path has the structure (exporter, group, cls, name).
"""
for match in self.matches:
if match.ismatch(resource_path):
return match
return None
[docs]
def hasmatch(self, resource_path):
"""Return True if this place as a ResourceMatch object for the given resource path.
A resource_path has the structure (exporter, group, cls, name).
"""
return self.getmatch(resource_path) is not None
[docs]
def unmatched(self, resource_paths):
"""Returns a match which could not be matched to the list of resource_path
A resource_path has the structure (exporter, group, cls, name).
"""
for match in self.matches:
if not any([match.ismatch(resource) for resource in resource_paths]):
return match
[docs]
def touch(self):
self.changed = time.time()
[docs]
class ReservationState(enum.Enum):
waiting = 0
allocated = 1
acquired = 2
expired = 3
invalid = 4
[docs]
@attr.s(eq=False)
class Reservation:
owner = attr.ib(validator=attr.validators.instance_of(str))
token = attr.ib(default=attr.Factory(
lambda: ''.join(random.choice(string.ascii_uppercase+string.digits) for i in range(10))))
state = attr.ib(
default='waiting',
converter=lambda x: x if isinstance(x, ReservationState) else ReservationState[x],
validator=attr.validators.instance_of(ReservationState))
prio = attr.ib(default=0.0, validator=attr.validators.instance_of(float))
# a dictionary of name -> filter dicts
filters = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict))
# a dictionary of name -> place names
allocations = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict))
created = attr.ib(default=attr.Factory(time.time))
timeout = attr.ib(default=attr.Factory(lambda: time.time() + 60))
[docs]
def asdict(self):
return {
'owner': self.owner,
'state': self.state.name,
'prio': self.prio,
'filters': self.filters,
'allocations': self.allocations,
'created': self.created,
'timeout': self.timeout,
}
[docs]
def refresh(self, delta=60):
self.timeout = max(self.timeout, time.time() + delta)
@property
def expired(self):
return self.timeout < time.time()
[docs]
def show(self, level=0):
indent = ' ' * level
print(indent + f"owner: {self.owner}")
print(indent + f"token: {self.token}")
print(indent + f"state: {self.state.name}")
if self.prio:
print(indent + f"prio: {self.prio}")
print(indent + "filters:")
for name, fltr in self.filters.items():
print(indent + f" {name}: {' '.join([(k + '=' + v) for k, v in fltr.items()])}")
if self.allocations:
print(indent + "allocations:")
for name, allocation in self.allocations.items():
print(indent + f" {name}: {', '.join(allocation)}")
print(indent + f"created: {datetime.fromtimestamp(self.created)}")
print(indent + f"timeout: {datetime.fromtimestamp(self.timeout)}")
[docs]
def enable_tcp_nodelay(session):
"""
asyncio/autobahn does not set TCP_NODELAY by default, so we need to do it
like this for now.
"""
s = session._transport.transport.get_extra_info('socket')
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
[docs]
def monkey_patch_max_msg_payload_size_ws_option():
"""
The default maxMessagePayloadSize in autobahn is 1M. For larger setups with a big number of
exported resources, this becomes the limiting factor.
Increase maxMessagePayloadSize in WampWebSocketClientFactory.setProtocolOptions() by monkey
patching it, so autobahn.asyncio.wamp.ApplicationRunner effectively sets the increased value.
This function must be called before ApplicationRunner is instanciated.
"""
from autobahn.asyncio.websocket import WampWebSocketClientFactory
original_method = WampWebSocketClientFactory.setProtocolOptions
def set_protocol_options(*args, **kwargs):
new_max_message_payload_size = 10485760
# maxMessagePayloadSize given as positional arg
args = list(args)
try:
args[9] = max((args[9], new_max_message_payload_size))
except IndexError:
pass
# maxMessagePayloadSize given as kwarg
kwarg_name = "maxMessagePayloadSize"
if kwarg_name in kwargs and kwargs[kwarg_name] is not None:
kwargs[kwarg_name] = max((kwargs[kwarg_name], new_max_message_payload_size))
return original_method(*args, **kwargs)
WampWebSocketClientFactory.setProtocolOptions = set_protocol_options