Add mqtt reconnect handling

This commit is contained in:
Odd Stråbø 2022-01-26 20:41:59 +01:00
parent b5740d78c8
commit 00fb5f1dee
1 changed files with 23 additions and 1 deletions

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import json
from time import sleep
from typing import Any, Dict, List, Optional
from uuid import uuid4
@ -123,6 +124,8 @@ class MqttConsumer(BaseConsumer):
self.client = mqtt.Client(client_id=settings["client"]["id"], userdata=self)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
self.client.on_connect_fail = self.on_connect_fail
# Will must be set before connecting!!
self.client.will_set(
f"{self.topic_prefix}/available", payload="offline", retain=True
@ -203,13 +206,32 @@ class MqttConsumer(BaseConsumer):
f"{userdata.topic_prefix}/available", payload="online", retain=True
)
@staticmethod
def on_connect_fail(client: mqtt.Client, userdata: "MqttConsumer"):
print(userdata.__class__.__name__, "on_connect_fail")
# The callback for when a PUBLISH message is received from the server.
@staticmethod
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
@staticmethod
def on_disconnect(client: mqtt.Client, userdata: "MqttConsumer", rc, prop=None):
print(userdata.__class__.__name__, "on_disconnect", rc)
def poll(self):
self.client.loop(timeout=0.1, max_packets=5)
res = self.client.loop(timeout=0.1, max_packets=5)
if res != mqtt.MQTT_ERR_SUCCESS:
print(self.__class__.__name__, "loop returned non-success:", res)
try:
sleep(1)
res = self.client.reconnect()
if res != mqtt.MQTT_ERR_SUCCESS:
print(self.__class__.__name__, "Reconnect failed:", res)
except (OSError, mqtt.WebsocketConnectionError) as err:
print(self.__class__.__name__, "Reconnect failed:", err)
return super().poll()
def write(self, data: Dict[str, Any]):