srne-mqtt/consumers/mqtt.py
2021-11-14 03:32:30 +01:00

209 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
from typing import Any, Dict, List, Optional
from uuid import uuid4
import paho.mqtt.client as mqtt
from . import BaseConsumer
MAP_VALUES: Dict[str, Dict[str, Any]] = {
# "battery_voltage_min",
# "battery_voltage_max",
# "charge_max_current",
# "_discharge_max_current?",
# "charge_max_power",
# "discharge_max_power",
# "charge_amp_hour",
# "discharge_amp_hour",
"production_power": {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
},
"consumption_power": {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
},
# "run_days",
# "discharge_count",
# "full_charge_count",
# "total_charge_amp_hours",
# "total_discharge_amp_hours",
"total_production_power": {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
},
"total_consumption_power": {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
},
#
"battery_charge": {"unit": "%", "type": "battery", "state_class": "measurement"},
"battery_voltage": {"unit": "V", "type": "voltage", "state_class": "measurement"},
"battery_current": {"unit": "A", "type": "current", "state_class": "measurement"},
"internal_temperature": {
"unit": "°C",
"type": "temperature",
"state_class": "measurement",
},
"battery_temperature": {
"unit": "°C",
"type": "temperature",
"state_class": "measurement",
},
"load_voltage": {"unit": "V", "type": "voltage", "state_class": "measurement"},
"load_current": {"unit": "A", "type": "current", "state_class": "measurement"},
"load_power": {"unit": "W", "type": "power", "state_class": "measurement"},
"panel_voltage": {"unit": "V", "type": "voltage", "state_class": "measurement"},
"panel_current": {"unit": "A", "type": "current", "state_class": "measurement"},
"panel_power": {"unit": "W", "type": "power", "state_class": "measurement"},
# "load_enabled",
}
class MqttConsumer(BaseConsumer):
client: mqtt.Client
initialized: List[str]
def __init__(self, settings: Dict[str, Any]) -> None:
self.initialized = []
super().__init__(settings)
self.client = mqtt.Client(client_id=settings["client"]["id"], userdata=self)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
# Will must be set before connecting!!
self.client.will_set(
f"{self.topic_prefix}/available", payload="offline", retain=True
)
self.client.connect(
settings["client"]["host"],
settings["client"]["port"],
settings["client"]["keepalive"],
)
def config(self, settings: Dict[str, Any]):
super().config(settings)
settings.setdefault("client", {})
settings["client"].setdefault("id", None)
settings["client"].setdefault("host", "")
settings["client"].setdefault("port", 1883)
settings["client"].setdefault("keepalive", 60)
if not settings.get("device_id"):
settings["device_id"] = str(uuid4())
settings.setdefault("prefix", "solarmppt")
settings.setdefault("discovery_prefix", "homeassistant")
@property
def topic_prefix(self):
return f"{self.settings['prefix']}/{self.settings['device_id']}"
def get_ha_config(
self,
id,
name,
unit: Optional[str] = None,
type: Optional[str] = None,
expiry: int = 90,
state_class: Optional[str] = None,
):
assert state_class in [None, "measurement", "total", "total_increasing"]
res = {
"~": f"{self.topic_prefix}",
"unique_id": f"{self.settings['device_id']}_{id}",
"availability_topic": "~/available",
"state_topic": f"~/{id}",
"name": name,
"device": {
"identifiers": [
self.settings["device_id"],
],
# TODO: Get charger serial and use for identifier instead
# See: https://www.home-assistant.io/integrations/sensor.mqtt/#device
# "via_device": self.settings["device_id"],
},
"force_update": True,
"expire_after": expiry,
}
if unit:
res["unit_of_meas"] = unit
if type:
res["dev_cla"] = type
if state_class:
res["state_class"] = state_class
# The callback for when the client receives a CONNACK response from the server.
@staticmethod
def on_connect(client: mqtt.Client, userdata: "MqttConsumer", flags, rc):
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
# client.subscribe("$SYS/#")
client.publish(
f"{userdata.topic_prefix}/available", payload="online", retain=True
)
# The callback for when a PUBLISH message is received from the server.
@staticmethod
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
def poll(self):
self.client.loop(timeout=0.1, max_packets=5)
return super().poll()
def write(self, data: Dict[str, Any]):
self.client.publish(f"{self.topic_prefix}/raw", payload=data)
for k, v in data.items():
if k in MAP_VALUES:
if k not in self.initialized:
pretty_name = k.replace("_", " ").capitalize()
self.client.publish(
f"{self.settings['discovery_prefix']}/sensor/{self.settings['device_id']}/config", # noqa: E501
self.get_ha_config(k, pretty_name, **MAP_VALUES[k]),
retain=True,
)
self.initialized.append(k)
self.client.publish(f"{self.topic_prefix}/{k}", v, retain=True)
def exit(self):
self.client.publish(
f"{self.topic_prefix}/available", payload="offline", retain=True
)
while self.client.want_write():
self.client.loop_write(10)
self.client.disconnect()
return super().exit()
# Client(client_id="", clean_session=True, userdata=None,
# protocol=MQTTv311, transport="tcp")
# connect_srv(domain, keepalive=60, bind_address="")
# Connect to a broker using an SRV DNS lookup to obtain the broker address.
# Takes the following arguments:
# domain
# the DNS domain to search for SRV records.
# If None, try to determine the local domain name.
# client.will_set(topic, payload=None, qos=0, retain=False)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
# client.loop_forever()