Source code for labgrid.remote.coordinator

"""The coordinator module coordinates exported resources and clients accessing them."""
# pylint: disable=no-member,unused-argument
import asyncio
import traceback
from collections import defaultdict
from os import environ
from pprint import pprint
from enum import Enum

import attr
import yaml
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationRunner, ApplicationSession
from autobahn.wamp.types import RegisterOptions

from .common import ResourceEntry, ResourceMatch, Place, enable_tcp_nodelay


[docs]class Action(Enum): ADD = 0 DEL = 1 UPD = 2
[docs]@attr.s(init=False, cmp=False) class RemoteSession: """class encapsulating a session, used by ExporterSession and ClientSession""" coordinator = attr.ib() session = attr.ib() authid = attr.ib() version = attr.ib(default="unknown", init=False) @property def key(self): """Key of the session""" return self.session @property def name(self): """Name of the session""" return self.authid.split('/', 1)[1]
[docs]@attr.s(cmp=False) class ExporterSession(RemoteSession): """An ExporterSession is opened for each Exporter connecting to the coordinator, allowing the Exporter to get and set resources""" groups = attr.ib(default=attr.Factory(dict), init=False)
[docs] def set_resource(self, groupname, resourcename, resource): group = self.groups.setdefault(groupname, {}) old = group.get(resourcename) if resource: new = group[resourcename] = ResourceEntry(resource) cls = new.cls elif old: new = None cls = old.cls del group[resourcename] else: new = None cls = None self.coordinator.publish( 'org.labgrid.coordinator.resource_changed', self.name, groupname, resourcename, new.asdict() if new else {} ) resource_path = (self.name, groupname, cls, resourcename) if old and new: assert old.cls == new.cls return Action.UPD, resource_path elif old: return Action.DEL, resource_path elif new: return Action.ADD, resource_path return None, resource_path
[docs] def get_resources(self): """Method invoked by the exporter, get a resource from the coordinator""" result = {} for groupname, group in self.groups.items(): result_group = result[groupname] = {} for resourcename, resource in group.items(): result_group[resourcename] = resource.asdict() return result
[docs]@attr.s(cmp=False) class ClientSession(RemoteSession): acquired = attr.ib(default=attr.Factory(list), init=False)
class CoordinatorComponent(ApplicationSession): async def onConnect(self): self.sessions = {} self.places = {} self.poll_task = None self.save_scheduled = False self.load() self.save_later() enable_tcp_nodelay(self) self.join(self.config.realm, ["ticket"], "coordinator") def onChallenge(self, challenge): return "dummy-ticket" async def onJoin(self, details): await self.subscribe(self.on_session_join, 'wamp.session.on_join') await self.subscribe( self.on_session_leave, 'wamp.session.on_leave' ) await self.register( self.attach, 'org.labgrid.coordinator.attach', options=RegisterOptions(details_arg='details') ) # resources await self.register( self.set_resource, 'org.labgrid.coordinator.set_resource', options=RegisterOptions(details_arg='details') ) await self.register( self.get_resources, 'org.labgrid.coordinator.get_resources' ) # places await self.register( self.add_place, 'org.labgrid.coordinator.add_place' ) await self.register( self.del_place, 'org.labgrid.coordinator.del_place' ) await self.register( self.add_place_alias, 'org.labgrid.coordinator.add_place_alias' ) await self.register( self.del_place_alias, 'org.labgrid.coordinator.del_place_alias' ) await self.register( self.set_place_comment, 'org.labgrid.coordinator.set_place_comment' ) await self.register( self.add_place_match, 'org.labgrid.coordinator.add_place_match' ) await self.register( self.del_place_match, 'org.labgrid.coordinator.del_place_match' ) await self.register( self.acquire_place, 'org.labgrid.coordinator.acquire_place', options=RegisterOptions(details_arg='details') ) await self.register( self.release_place, 'org.labgrid.coordinator.release_place', options=RegisterOptions(details_arg='details') ) await self.register( self.allow_place, 'org.labgrid.coordinator.allow_place', options=RegisterOptions(details_arg='details') ) await self.register( self.get_places, 'org.labgrid.coordinator.get_places' ) self.poll_task = asyncio.get_event_loop().create_task(self.poll()) print("Coordinator ready.") async def onLeave(self, details): self.save() if self.poll_task: self.poll_task.cancel() await asyncio.wait([self.poll_task]) super().onLeave(details) async def onDisconnect(self): self.save() if self.poll_task: self.poll_task.cancel() await asyncio.wait([self.poll_task]) await asyncio.sleep(0.5) # give others a chance to clean up async def _poll_step(self): # save changes if self.save_scheduled: self.save() # poll exporters for session in list(self.sessions.values()): if isinstance(session, ExporterSession): fut = self.call( 'org.labgrid.exporter.{}.version'.format(session.name) ) done, _ = await asyncio.wait([fut], timeout=5) if not done: print('kicking exporter ({}/{})'.format(session.key, session.name)) await self.on_session_leave(session.key) continue try: session.version = done.pop().result() except wamp.exception.ApplicationError as e: if e.error == "wamp.error.no_such_procedure": pass # old client elif e.error == "wamp.error.canceled": pass # disconnected else: raise async def poll(self): loop = asyncio.get_event_loop() while not loop.is_closed(): try: await asyncio.sleep(15.0) await self._poll_step() except asyncio.CancelledError: break except Exception: # pylint: disable=broad-except traceback.print_exc() def save_later(self): self.save_scheduled = True def save(self): self.save_scheduled = False with open('resources.yaml', 'w') as f: resources = self._get_resources() f.write(yaml.dump(resources, default_flow_style=False)) with open('places.yaml', 'w') as f: places = self._get_places() f.write(yaml.dump(places, default_flow_style=False)) def load(self): try: self.place = {} with open('places.yaml', 'r') as f: self.places = yaml.load(f.read()) for placename, config in self.places.items(): config['name'] = placename # FIXME maybe recover previously acquired places here? if 'acquired' in config: del config['acquired'] if 'acquired_resources' in config: del config['acquired_resources'] if 'allowed' in config: del config['allowed'] config['matches'] = [ResourceMatch(**match) for match in config['matches']] place = Place(**config) self.places[placename] = place except FileNotFoundError: pass def _add_default_place(self, name): if name in self.places: return if not name.isdigit(): return place = Place(name) print(place) place.matches.append(ResourceMatch(exporter="*", group=name, cls="*")) self.places[name] = place async def _update_acquired_places(self, action, resource_path): """Update acquired places when resources are added or removed.""" if action not in [Action.ADD, Action.DEL]: return # currently nothing needed for Action.UPD for placename, place in self.places.items(): if not place.acquired: continue if not place.hasmatch(resource_path): continue if action is Action.ADD: place.acquired_resources.append(resource_path) else: place.acquired_resources.remove(resource_path) self.publish( 'org.labgrid.coordinator.place_changed', placename, place.asdict() ) async def on_session_join(self, session_details): print('join') pprint(session_details) session = session_details['session'] authid = session_details['authid'] if authid.startswith('client/'): session = ClientSession(self, session, authid) elif authid.startswith('exporter/'): session = ExporterSession(self, session, authid) else: return self.sessions[session.key] = session async def on_session_leave(self, session_id): print('leave ({})'.format(session_id)) try: session = self.sessions.pop(session_id) except KeyError: return if isinstance(session, ExporterSession): for groupname, group in session.groups.items(): for resourcename in group.copy(): action, resource_path = session.set_resource(groupname, resourcename, {}) await self._update_acquired_places(action, resource_path) # pylint: disable=not-an-iterable self.save_later() async def attach(self, name, details=None): # TODO check if name is in use session = self.sessions[details.caller] session_details = self.sessions[session] session_details['name'] = name self.exporters[name] = defaultdict(dict) async def set_resource(self, groupname, resourcename, resource, details=None): session = self.sessions.get(details.caller) if session is None: return assert isinstance(session, ExporterSession) groupname = str(groupname) resourcename = str(resourcename) # TODO check if acquired print(details) pprint(resource) action, resource_path = session.set_resource(groupname, resourcename, resource) if action is Action.ADD: self._add_default_place(groupname) await self._update_acquired_places(action, resource_path) # pylint: disable=not-an-iterable self.save_later() def _get_resources(self): result = {} for session in self.sessions.values(): if isinstance(session, ExporterSession): result[session.name] = session.get_resources() return result async def get_resources(self, details=None): return self._get_resources() async def add_place(self, name, details=None): if not name or not isinstance(name, str): return False if name in self.places: return False place = Place(name) self.places[name] = place self.publish( 'org.labgrid.coordinator.place_changed', name, place.asdict() ) self.save_later() return True async def del_place(self, name, details=None): if not name or not isinstance(name, str): return False if name not in self.places: return False del self.places[name] self.publish( 'org.labgrid.coordinator.place_changed', name, {} ) self.save_later() return True async def add_place_alias(self, placename, alias, details=None): try: place = self.places[placename] except KeyError: return False place.aliases.add(alias) place.touch() self.publish( 'org.labgrid.coordinator.place_changed', placename, place.asdict() ) self.save_later() return True async def del_place_alias(self, placename, alias, details=None): try: place = self.places[placename] except KeyError: return False try: place.aliases.remove(alias) except ValueError: return False place.touch() self.publish( 'org.labgrid.coordinator.place_changed', placename, place.asdict() ) self.save_later() return True async def set_place_comment(self, placename, comment, details=None): try: place = self.places[placename] except KeyError: return False place.comment = comment place.touch() self.publish( 'org.labgrid.coordinator.place_changed', placename, place.asdict() ) self.save_later() return True async def add_place_match(self, placename, pattern, rename=None, details=None): try: place = self.places[placename] except KeyError: return False match = ResourceMatch(*pattern.split('/'), rename=rename) if match in place.matches: return False place.matches.append(match) place.touch() self.publish( 'org.labgrid.coordinator.place_changed', placename, place.asdict() ) self.save_later() return True async def del_place_match(self, placename, pattern, rename=None, details=None): try: place = self.places[placename] except KeyError: return False match = ResourceMatch(*pattern.split('/'), rename=rename) try: place.matches.remove(match) except ValueError: return False place.touch() self.publish( 'org.labgrid.coordinator.place_changed', placename, place.asdict() ) self.save_later() return True async def acquire_place(self, name, details=None): print(details) try: place = self.places[name] except KeyError: return False if place.acquired: return False # FIXME use the session object instead? or something else which # survives disconnecting clients? place.acquired = self.sessions[details.caller].name for exporter, groups in self._get_resources().items(): for group_name, group in sorted(groups.items()): for resource_name, resource in sorted(group.items()): resource_path = (exporter, group_name, resource['cls'], resource_name) if not place.hasmatch(resource_path): continue place.acquired_resources.append(resource_path) place.touch() self.publish( 'org.labgrid.coordinator.place_changed', name, place.asdict() ) self.save_later() return True async def release_place(self, name, details=None): print(details) try: place = self.places[name] except KeyError: return False if not place.acquired: return False place.acquired = None place.acquired_resources = [] place.allowed = set() place.touch() self.publish( 'org.labgrid.coordinator.place_changed', name, place.asdict() ) self.save_later() return True async def allow_place(self, name, user, details=None): try: place = self.places[name] except KeyError: return False if not place.acquired: return False if not place.acquired == self.sessions[details.caller].name: return False place.allowed.add(user) place.touch() self.publish( 'org.labgrid.coordinator.place_changed', name, place.asdict() ) self.save_later() return True def _get_places(self): return {k: v.asdict() for k, v in self.places.items()} async def get_places(self, details=None): return self._get_places() if __name__ == '__main__': runner = ApplicationRunner( url=environ.get("WS", u"ws://127.0.0.1:20408/ws"), realm="realm1", ) runner.run(CoordinatorComponent)