diff --git a/consumers/mqtt.py b/consumers/mqtt.py index 57b6d8f..8e54776 100644 --- a/consumers/mqtt.py +++ b/consumers/mqtt.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import json +from time import sleep from typing import Any, Dict, List, Optional from uuid import uuid4 @@ -123,6 +124,8 @@ class MqttConsumer(BaseConsumer): self.client = mqtt.Client(client_id=settings["client"]["id"], userdata=self) self.client.on_connect = self.on_connect self.client.on_message = self.on_message + self.client.on_disconnect = self.on_disconnect + self.client.on_connect_fail = self.on_connect_fail # Will must be set before connecting!! self.client.will_set( f"{self.topic_prefix}/available", payload="offline", retain=True @@ -203,13 +206,32 @@ class MqttConsumer(BaseConsumer): f"{userdata.topic_prefix}/available", payload="online", retain=True ) + @staticmethod + def on_connect_fail(client: mqtt.Client, userdata: "MqttConsumer"): + print(userdata.__class__.__name__, "on_connect_fail") + # The callback for when a PUBLISH message is received from the server. @staticmethod def on_message(client, userdata, msg): print(msg.topic + " " + str(msg.payload)) + @staticmethod + def on_disconnect(client: mqtt.Client, userdata: "MqttConsumer", rc, prop=None): + print(userdata.__class__.__name__, "on_disconnect", rc) + def poll(self): - self.client.loop(timeout=0.1, max_packets=5) + res = self.client.loop(timeout=0.1, max_packets=5) + + if res != mqtt.MQTT_ERR_SUCCESS: + print(self.__class__.__name__, "loop returned non-success:", res) + try: + sleep(1) + res = self.client.reconnect() + if res != mqtt.MQTT_ERR_SUCCESS: + print(self.__class__.__name__, "Reconnect failed:", res) + except (OSError, mqtt.WebsocketConnectionError) as err: + print(self.__class__.__name__, "Reconnect failed:", err) + return super().poll() def write(self, data: Dict[str, Any]):