Source code for labgrid.resource.udev

from functools import partial

import attr
import os
import pyudev
import warnings

from ..factory import target_factory
from .common import ManagedResource, ResourceManager
from .base import SerialPort, EthernetInterface


@attr.s
[docs]class UdevManager(ResourceManager): def __attrs_post_init__(self): super().__attrs_post_init__() self._context = pyudev.Context() self._monitor = pyudev.Monitor.from_netlink(self._context) self._monitor.start()
[docs] def on_resource_added(self, resource): devices = self._context.list_devices() devices.match_subsystem(resource.match['SUBSYSTEM']) for device in devices: if resource.try_match(device): break
[docs] def poll(self): for device in iter(partial(self._monitor.poll, 0), None): print("{0.action}: {0}".format(device)) for resource in self.resources: print(" {}".format(resource)) if resource.try_match(device): break
@attr.s
[docs]class USBResource(ManagedResource): manager_cls = UdevManager match = attr.ib(validator=attr.validators.instance_of(dict), hash=False) device = attr.ib(default=None, hash=False) def __attrs_post_init__(self): self.timeout = 5.0 self.match.setdefault('SUBSYSTEM', 'usb') super().__attrs_post_init__()
[docs] def filter_match(self, device): return True
[docs] def try_match(self, device): if self.device: if self.device.sys_path != device.sys_path: return False else: # new device def match_single(dev, key, value): if dev.get(key) == value: return True elif dev.attributes.get(key) == value: return True elif getattr(dev, key, None) == value: return True return False def match_ancestors(key, value): for ancestor in device.ancestors: if match_single(ancestor, key, value): return True return False for k, v in self.match.items(): if k.startswith('@'): if not match_ancestors(k[1:], v): return False else: if not match_single(device, k, v): return False if not self.filter_match(device): return False print(" found match: {}".format(self)) if device.action in [None, 'add']: if self.avail: warnings.warn("udev device {} is already available".format(device)) self.avail = True self.device = device elif device.action in ['change', 'move']: self.device = device else: self.avail = False self.device = None self.update() return True
[docs] def update(self): pass
@property def busnum(self): if self.device: return int(self.device.get('BUSNUM')) @property def devnum(self): if self.device: return int(self.device.get('DEVNUM')) def _get_usb_device(self): device = self.device if self.device and (self.device.subsystem != 'usb' or self.device.device_type != 'usb_device'): device = self.device.find_parent('usb', 'usb_device') return device @property def path(self): device = self._get_usb_device() if device: return str(device.sys_name) @property def vendor_id(self): device = self._get_usb_device() if device: return int(device.get('ID_VENDOR_ID'), 16) @property def model_id(self): device = self._get_usb_device() if device: return int(device.get('ID_MODEL_ID'), 16)
[docs] def read_attr(self, attribute): """read uncached attribute value from sysfs pyudev currently supports only cached access to attributes, so we read directly from sysfs. """ # FIXME update pyudev to support udev_device_set_sysattr_value(dev, # attr, None) to clear the cache if self.device: with open(os.path.join(self.device.sys_path, attribute), 'rb') as f: return f.read().rstrip(b'\n') # drop trailing newlines
@target_factory.reg_resource @attr.s
[docs]class USBSerialPort(SerialPort, USBResource): def __attrs_post_init__(self): self.match['SUBSYSTEM'] = 'tty' super().__attrs_post_init__()
[docs] def update(self): super().update() if self.device: self.port = self.device.device_node else: self.port = None
@target_factory.reg_resource @attr.s
[docs]class USBMassStorage(USBResource): def __attrs_post_init__(self): self.match['SUBSYSTEM'] = 'block' self.match['DEVTYPE'] = 'disk' self.match['@SUBSYSTEM'] = 'usb' super().__attrs_post_init__() @property def path(self): return self.device.device_node
@target_factory.reg_resource @attr.s
[docs]class IMXUSBLoader(USBResource):
[docs] def filter_match(self, device): if device.get('ID_VENDOR_ID') != "15a2": return False if device.get('ID_MODEL_ID') not in ["0054", "0061"]: return False return super().filter_match(device)
@target_factory.reg_resource @attr.s
[docs]class MXSUSBLoader(USBResource):
[docs] def filter_match(self, device): if device.get('ID_VENDOR_ID') != "066f": return False if device.get('ID_MODEL_ID') not in ["3780"]: return False return super().filter_match(device)
@target_factory.reg_resource @attr.s
[docs]class AndroidFastboot(USBResource):
[docs] def filter_match(self, device): if device.get('ID_VENDOR_ID') != "1d6b": return False if device.get('ID_MODEL_ID') != "0104": return False return super().filter_match(device)
@target_factory.reg_resource @attr.s
[docs]class USBEthernetInterface(EthernetInterface, USBResource): def __attrs_post_init__(self): self.match['SUBSYSTEM'] = 'net' self.match['@SUBSYSTEM'] = 'usb' super().__attrs_post_init__()
[docs] def update(self): super().update() if self.device: self.ifname = self.device.get('INTERFACE') else: self.ifname = None
@property def if_state(self): value = self.read_attr('operstate') if value is not None: value = value.decode('ascii') return value
@target_factory.reg_resource @attr.s
[docs]class AlteraUSBBlaster(USBResource):
[docs] def filter_match(self, device): if device.get('ID_VENDOR_ID') != "09fb": return False if device.get('ID_MODEL_ID') != "6810": return False return super().filter_match(device)