Source code for labgrid.driver.mqtt

#!/usr/bin/env python3

import time

import attr

from .common import Driver
from ..factory import target_factory
from ..protocol import PowerProtocol
from ..step import step
from ..util import Timeout


[docs] class MQTTError(Exception): pass
[docs] @target_factory.reg_driver @attr.s(eq=False) class TasmotaPowerDriver(Driver, PowerProtocol): bindings = { "power": {"TasmotaPowerPort"} } delay = attr.ib(default=2.0, validator=attr.validators.instance_of(float)) _client = attr.ib(default=None) _status = attr.ib(default=None)
[docs] def __attrs_post_init__(self): super().__attrs_post_init__() import paho.mqtt.client as mqtt self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
[docs] def on_activate(self): self._client.on_message = self._on_message self._client.on_connect = self._on_connect self._client.connect(self.power.host) self._client.loop_start()
[docs] def on_deactivate(self): self._client.loop_stop()
def _on_message(self, client, userdata, msg): if msg.payload == b'ON': status = True elif msg.payload == b'OFF': status = False else: raise ValueError(f"Unknown status: {msg.payload}. Must be 'ON' or 'OFF'") self._status = status def _on_connect(self, client, userdata, flags, reason_code, properties): client.subscribe(self.power.status_topic) def _publish(self, topic, payload): msg = self._client.publish(topic, payload=payload) timeout = Timeout(3.0) while not msg.is_published: time.sleep(0.1) if timeout.expired: raise MQTTError("publish timed out") return msg
[docs] @Driver.check_active @step() def on(self): self._publish(self.power.power_topic, "ON") timeout = Timeout(3.0) while self._status is False: time.sleep(0.1) if timeout.expired: raise MQTTError("Port did not change status within 3 seconds")
[docs] @Driver.check_active @step() def off(self): self._publish(self.power.power_topic, "OFF") timeout = Timeout(3.0) while self._status is True: time.sleep(0.1) if timeout.expired: raise MQTTError("Port did not change status within 3 seconds")
[docs] @Driver.check_active @step() def cycle(self): self.off() time.sleep(self.delay) self.on()
[docs] @Driver.check_active @step() def get(self): self._client.publish(self.power.power_topic) timeout = Timeout(3.0) while self._status is None: time.sleep(0.1) if timeout.expired: raise MQTTError("Could not get initial status") return self._status