Rework and restructure MQTT

This commit is contained in:
Odd Stråbø 2023-12-10 23:59:50 +01:00
parent 6c0f1c3d13
commit 67a25eeef9
1 changed files with 54 additions and 27 deletions

View File

@ -116,28 +116,37 @@ PayloadType: TypeAlias = str | bytes | bytearray | int | float | None
class MqttConsumer(BaseConsumer): class MqttConsumer(BaseConsumer):
client: mqtt.Client
initialized: List[str] initialized: List[str]
_client: mqtt.Client | None = None
def __init__(self, settings: Dict[str, Any]) -> None: def __init__(self, settings: Dict[str, Any]) -> None:
self.initialized = [] self.initialized = []
super().__init__(settings) super().__init__(settings)
self.client = mqtt.Client(client_id=settings["client"]["id"], userdata=self)
self.client.on_connect = self.on_connect @property
self.client.on_message = self.on_message def client(self) -> mqtt.Client:
self.client.on_disconnect = self.on_disconnect if self._client is not None:
self.client.on_connect_fail = self.on_connect_fail return self._client
self._client = mqtt.Client(
client_id=self.settings["client"]["id"], userdata=self
)
self._client.on_connect = self.on_connect
self._client.on_message = self.on_message
self._client.on_disconnect = self.on_disconnect
self._client.on_connect_fail = self.on_connect_fail
# Will must be set before connecting!! # Will must be set before connecting!!
self.client.will_set( self._client.will_set(
f"{self.topic_prefix}/available", payload="offline", retain=True f"{self.topic_prefix}/available", payload="offline", retain=True
) )
while True: while True:
try: try:
self.client.connect( self._client.connect(
settings["client"]["host"], self.settings["client"]["host"],
settings["client"]["port"], self.settings["client"]["port"],
settings["client"]["keepalive"], self.settings["client"]["keepalive"],
) )
break break
except OSError as err: except OSError as err:
@ -151,6 +160,7 @@ class MqttConsumer(BaseConsumer):
raise raise
print(err) print(err)
sleep(0.1) sleep(0.1)
return self._client
def config(self, settings: Dict[str, Any]): def config(self, settings: Dict[str, Any]):
super().config(settings) super().config(settings)
@ -167,9 +177,19 @@ class MqttConsumer(BaseConsumer):
settings.setdefault("discovery_prefix", "homeassistant") settings.setdefault("discovery_prefix", "homeassistant")
_controller_id: str | None = None
@property
def controller_id(self) -> str:
assert self.controller is not None
# Controller serial is fetched from device, cache it.
if self._controller_id is None:
self._controller_id = self.controller.serial
return f"{self.controller.manufacturer_id}_{self._controller_id}"
@property @property
def topic_prefix(self): def topic_prefix(self):
return f"{self.settings['prefix']}/{self.settings['device_id']}" return f"{self.settings['prefix']}/{self.controller_id}"
def get_ha_config( def get_ha_config(
self, self,
@ -181,21 +201,25 @@ class MqttConsumer(BaseConsumer):
state_class: Optional[str] = None, state_class: Optional[str] = None,
): ):
assert state_class in [None, "measurement", "total", "total_increasing"] assert state_class in [None, "measurement", "total", "total_increasing"]
assert self.controller is not None
res = { res = {
"~": f"{self.topic_prefix}", "~": f"{self.topic_prefix}",
"unique_id": f"{self.settings['device_id']}_{id}", "unique_id": f"{self.controller_id}_{id}",
"object_id": f"{self.controller_id}_{id}",
"availability_topic": "~/available", "availability_topic": "~/available",
"state_topic": f"~/{id}", "state_topic": f"~/{id}",
"name": name, "name": name,
"device": { "device": {
"identifiers": [ "identifiers": [
self.settings["device_id"], self.controller_id,
], ],
# TODO: Get charger serial and use for identifier instead "manufacturer": self.controller.manufacturer,
# See: https://www.home-assistant.io/integrations/sensor.mqtt/#device "model": self.controller.model,
# "via_device": self.settings["device_id"], "hw_version": self.controller.version,
"via_device": self.settings["device_id"],
"suggested_area": "Solar panel", "suggested_area": "Solar panel",
"name": self.controller.name,
}, },
"force_update": True, "force_update": True,
"expire_after": expiry, "expire_after": expiry,
@ -253,22 +277,25 @@ class MqttConsumer(BaseConsumer):
def write(self, data: Dict[str, PayloadType]): def write(self, data: Dict[str, PayloadType]):
self.client.publish(f"{self.topic_prefix}/raw", payload=json.dumps(data)) self.client.publish(f"{self.topic_prefix}/raw", payload=json.dumps(data))
for k, v in data.items(): for dataname, data_value in data.items():
if k in MAP_VALUES: if dataname in MAP_VALUES:
if k not in self.initialized: if dataname not in self.initialized:
km = MAP_VALUES[DataName(k)] km = MAP_VALUES[DataName(dataname)]
pretty_name = k.replace("_", " ").capitalize() pretty_name = dataname.replace("_", " ").capitalize()
disc_prefix = self.settings["discovery_prefix"] disc_prefix = self.settings["discovery_prefix"]
device_id = self.settings["device_id"]
self.client.publish( self.client.publish(
f"{disc_prefix}/sensor/{device_id}_{k}/config", f"{disc_prefix}/sensor/{self.controller_id}/{dataname}/config",
payload=json.dumps(self.get_ha_config(k, pretty_name, **km)), payload=json.dumps(
self.get_ha_config(dataname, pretty_name, **km)
),
retain=True, retain=True,
) )
self.initialized.append(k) self.initialized.append(dataname)
self.client.publish(f"{self.topic_prefix}/{k}", v, retain=True) self.client.publish(
f"{self.topic_prefix}/{dataname}", data_value, retain=True
)
def exit(self): def exit(self):
self.client.publish( self.client.publish(