Source code for labgrid.resource.mqtt

import threading
from time import monotonic

import attr

from .common import ManagedResource, ResourceManager
from ..factory import target_factory


[docs] @attr.s(eq=False) class MQTTManager(ResourceManager): _available = attr.ib(default=attr.Factory(set), validator=attr.validators.instance_of(set)) _avail_lock = attr.ib(default=threading.Lock()) _clients = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict)) _topics = attr.ib(default=attr.Factory(list), validator=attr.validators.instance_of(list)) _topic_lock = attr.ib(default=threading.Lock()) _last = attr.ib(default=0.0, validator=attr.validators.instance_of(float)) def _create_mqtt_connection(self, host, username, password): import paho.mqtt.client as mqtt client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) client.username_pw_set(username, password) client.connect(host) client.on_message = self._on_message client.loop_start() return client
[docs] def on_resource_added(self, resource): host = resource.host username = resource.username password = resource.password key = f"{host}:{username or ''}:{password or ''}" if key not in self._clients: self._clients[key] = self._create_mqtt_connection(host, username, password) self._clients[key].subscribe(resource.avail_topic)
def _on_message(self, client, userdata, msg): payload = msg.payload.decode("utf-8") topic = msg.topic if payload.lower() == "online": with self._avail_lock: self._available.add(topic) elif payload.lower() == "offline": with self._avail_lock: self._available.discard(topic)
[docs] def poll(self): if monotonic() - self._last < 2: return # ratelimit requests self._last = monotonic() with self._avail_lock: for resource in self.resources: resource.avail = resource.avail_topic in self._available
[docs] @target_factory.reg_resource @attr.s(eq=False) class MQTTResource(ManagedResource): manager_cls = MQTTManager host = attr.ib(validator=attr.validators.instance_of(str)) avail_topic = attr.ib(validator=attr.validators.instance_of(str)) username = attr.ib(default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) password = attr.ib(default=None, validator=attr.validators.optional(attr.validators.instance_of(str)))
[docs] def __attrs_post_init__(self): self.timeout = 30.0 super().__attrs_post_init__()
[docs] @target_factory.reg_resource @attr.s(eq=False) class TasmotaPowerPort(MQTTResource): power_topic = attr.ib(default=None, validator=attr.validators.instance_of(str)) status_topic = attr.ib(default=None, validator=attr.validators.instance_of(str))