import socket
import time
import enum
import random
import re
import string
from datetime import datetime
from fnmatch import fnmatchcase
import attr
__all__ = [
'TAG_KEY',
'TAG_VAL',
'ResourceEntry',
'ResourceMatch',
'Place',
'ReservationState',
'Reservation',
'enable_tcp_nodelay',
]
TAG_KEY = re.compile(r"[a-z][a-z0-9_]+")
TAG_VAL = re.compile(r"[a-z0-9_]?")
[docs]
@attr.s(eq=False)
class ResourceEntry:
data = attr.ib() # cls, params
[docs]
def __attrs_post_init__(self):
self.data.setdefault('acquired', None)
self.data.setdefault('avail', False)
@property
def acquired(self):
return self.data['acquired']
@property
def avail(self):
return self.data['avail']
@property
def cls(self):
return self.data['cls']
@property
def params(self):
return self.data['params']
@property
def args(self):
"""arguments for resource construction"""
args = self.data['params'].copy()
args.pop('extra', None)
return args
@property
def extra(self):
"""extra resource information"""
return self.data['params'].get('extra', {})
[docs]
def asdict(self):
return {
'cls': self.cls,
'params': self.params,
'acquired': self.acquired,
'avail': self.avail,
}
[docs]
def update(self, data):
"""apply updated information from the exporter on the coordinator"""
data = data.copy()
data.setdefault('acquired', None)
data.setdefault('avail', False)
self.data = data
[docs]
def acquire(self, place_name):
assert self.data['acquired'] is None
self.data['acquired'] = place_name
[docs]
def release(self):
# ignore repeated releases
self.data['acquired'] = None
[docs]
@attr.s(eq=True, repr=False, str=False)
# This class requires eq=True, since we put the matches into a list and require
# the cmp functions to be able to remove the matches from the list later on.
class ResourceMatch:
exporter = attr.ib()
group = attr.ib()
cls = attr.ib()
name = attr.ib(default=None)
# rename is just metadata, so don't use it for comparing matches
rename = attr.ib(default=None, eq=False)
[docs]
@classmethod
def fromstr(cls, pattern):
if not 2 <= pattern.count("/") <= 3:
raise ValueError(
f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')"
)
return cls(*pattern.split("/"))
[docs]
def __repr__(self):
result = f"{self.exporter}/{self.group}/{self.cls}"
if self.name is not None:
result += f"/{self.name}"
return result
[docs]
def __str__(self):
result = repr(self)
if self.rename:
result += " -> " + self.rename
return result
[docs]
def ismatch(self, resource_path):
"""Return True if this matches the given resource"""
try:
exporter, group, cls, name = resource_path
except ValueError:
exporter, group, cls = resource_path
name = None
if not fnmatchcase(exporter, self.exporter):
return False
if not fnmatchcase(group, self.group):
return False
if not fnmatchcase(cls, self.cls):
return False
if name and self.name and not fnmatchcase(name, self.name):
return False
return True
[docs]
@attr.s(eq=False)
class Place:
name = attr.ib()
aliases = attr.ib(default=attr.Factory(set), converter=set)
comment = attr.ib(default="")
tags = attr.ib(default=attr.Factory(dict))
matches = attr.ib(default=attr.Factory(list))
acquired = attr.ib(default=None)
acquired_resources = attr.ib(default=attr.Factory(list))
allowed = attr.ib(default=attr.Factory(set), converter=set)
created = attr.ib(default=attr.Factory(time.time))
changed = attr.ib(default=attr.Factory(time.time))
reservation = attr.ib(default=None)
[docs]
def asdict(self):
# in the coordinator, we have resource objects, otherwise just a path
acquired_resources = []
for resource in self.acquired_resources:
if isinstance(resource, (tuple, list)):
acquired_resources.append(resource)
else:
acquired_resources.append(resource.path)
return {
'aliases': list(self.aliases),
'comment': self.comment,
'tags': self.tags,
'matches': [attr.asdict(x) for x in self.matches],
'acquired': self.acquired,
'acquired_resources': acquired_resources,
'allowed': list(self.allowed),
'created': self.created,
'changed': self.changed,
'reservation': self.reservation,
}
[docs]
def update(self, config):
fields = attr.fields_dict(type(self))
for k, v in config.items():
assert k in fields
if k == 'name':
# we cannot rename places
assert v == self.name
continue
setattr(self, k, v)
[docs]
def show(self, level=0):
indent = ' ' * level
if self.aliases:
print(indent + f"aliases: {', '.join(sorted(self.aliases))}")
if self.comment:
print(indent + f"comment: {self.comment}")
if self.tags:
print(indent + f"tags: {', '.join(k + '=' + v for k, v in sorted(self.tags.items()))}")
print(indent + "matches:")
for match in sorted(self.matches):
print(indent + f" {match}")
print(indent + f"acquired: {self.acquired}")
print(indent + "acquired resources:")
# in the coordinator, we have resource objects, otherwise just a path
for resource in sorted(self.acquired_resources):
if isinstance(resource, (tuple, list)):
resource_path = resource
else:
resource_path = resource.path
match = self.getmatch(resource_path)
if match.rename:
print(indent + f" {'/'.join(resource_path)} -> {match.rename}")
else:
print(indent + f" {'/'.join(resource_path)}")
if self.allowed:
print(indent + f"allowed: {', '.join(self.allowed)}")
print(indent + f"created: {datetime.fromtimestamp(self.created)}")
print(indent + f"changed: {datetime.fromtimestamp(self.changed)}")
if self.reservation:
print(indent + f"reservation: {self.reservation}")
[docs]
def getmatch(self, resource_path):
"""Return the ResourceMatch object for the given resource path or None if not found.
A resource_path has the structure (exporter, group, cls, name).
"""
for match in self.matches:
if match.ismatch(resource_path):
return match
return None
[docs]
def hasmatch(self, resource_path):
"""Return True if this place as a ResourceMatch object for the given resource path.
A resource_path has the structure (exporter, group, cls, name).
"""
return self.getmatch(resource_path) is not None
[docs]
def unmatched(self, resource_paths):
"""Returns a match which could not be matched to the list of resource_path
A resource_path has the structure (exporter, group, cls, name).
"""
for match in self.matches:
if not any([match.ismatch(resource) for resource in resource_paths]):
return match
[docs]
def touch(self):
self.changed = time.time()
[docs]
class ReservationState(enum.Enum):
waiting = 0
allocated = 1
acquired = 2
expired = 3
invalid = 4
[docs]
@attr.s(eq=False)
class Reservation:
owner = attr.ib(validator=attr.validators.instance_of(str))
token = attr.ib(default=attr.Factory(
lambda: ''.join(random.choice(string.ascii_uppercase+string.digits) for i in range(10))))
state = attr.ib(
default='waiting',
converter=lambda x: x if isinstance(x, ReservationState) else ReservationState[x],
validator=attr.validators.instance_of(ReservationState))
prio = attr.ib(default=0.0, validator=attr.validators.instance_of(float))
# a dictionary of name -> filter dicts
filters = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict))
# a dictionary of name -> place names
allocations = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict))
created = attr.ib(default=attr.Factory(time.time))
timeout = attr.ib(default=attr.Factory(lambda: time.time() + 60))
[docs]
def asdict(self):
return {
'owner': self.owner,
'state': self.state.name,
'prio': self.prio,
'filters': self.filters,
'allocations': self.allocations,
'created': self.created,
'timeout': self.timeout,
}
[docs]
def refresh(self, delta=60):
self.timeout = max(self.timeout, time.time() + delta)
@property
def expired(self):
return self.timeout < time.time()
[docs]
def show(self, level=0):
indent = ' ' * level
print(indent + f"owner: {self.owner}")
print(indent + f"token: {self.token}")
print(indent + f"state: {self.state.name}")
if self.prio:
print(indent + f"prio: {self.prio}")
print(indent + "filters:")
for name, fltr in self.filters.items():
print(indent + f" {name}: {' '.join([(k + '=' + v) for k, v in fltr.items()])}")
if self.allocations:
print(indent + "allocations:")
for name, allocation in self.allocations.items():
print(indent + f" {name}: {', '.join(allocation)}")
print(indent + f"created: {datetime.fromtimestamp(self.created)}")
print(indent + f"timeout: {datetime.fromtimestamp(self.timeout)}")
[docs]
def enable_tcp_nodelay(session):
"""
asyncio/autobahn does not set TCP_NODELAY by default, so we need to do it
like this for now.
"""
s = session._transport.transport.get_extra_info('socket')
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)