Move types to separate file, start using enum

This commit is contained in:
Odd Stråbø 2021-11-18 23:09:46 +01:00
parent 50978111c5
commit 57709242fa
4 changed files with 212 additions and 138 deletions

View File

@ -5,65 +5,95 @@ from uuid import uuid4
import paho.mqtt.client as mqtt
from solar_types import DataName
from . import BaseConsumer
MAP_VALUES: Dict[str, Dict[str, Any]] = {
# "battery_voltage_min",
# "battery_voltage_max",
# "charge_max_current",
# "_discharge_max_current?",
# "charge_max_power",
# "discharge_max_power",
# "charge_amp_hour",
# "discharge_amp_hour",
"production_power": {
MAP_VALUES: Dict[DataName, Dict[str, Any]] = {
# DataName.BATTERY_VOLTAGE_MIN: {},
# DataName.BATTERY_VOLTAGE_MAX: {},
# DataName.CHARGE_MAX_CURRENT: {},
# DataName._DISCHARGE_MAX_CURRENT: {},
# DataName.CHARGE_MAX_POWER: {},
# DataName.DISCHARGE_MAX_POWER: {},
# DataName.CHARGE_AMP_HOUR: {},
# DataName.DISCHARGE_AMP_HOUR: {},
DataName.PRODUCTION_ENERGY: {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
},
"consumption_power": {
DataName.CONSUMPTION_ENERGY: {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
},
# "run_days",
# "discharge_count",
# "full_charge_count",
# "total_charge_amp_hours",
# "total_discharge_amp_hours",
"total_production_power": {
# DataName.RUN_DAYS: {},
# DataName.DISCHARGE_COUNT: {},
# DataName.FULL_CHARGE_COUNT: {},
# DataName.TOTAL_CHARGE_AMP_HOURS: {},
# DataName.TOTAL_DISCHARGE_AMP_HOURS: {},
DataName.TOTAL_PRODUCTION_ENERGY: {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
"expiry": 180,
},
"total_consumption_power": {
DataName.TOTAL_CONSUMPTION_ENERGY: {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
"expiry": 180,
},
#
"battery_charge": {"unit": "%", "type": "battery", "state_class": "measurement"},
"battery_voltage": {"unit": "V", "type": "voltage", "state_class": "measurement"},
"battery_current": {"unit": "A", "type": "current", "state_class": "measurement"},
"internal_temperature": {
DataName.BATTERY_CHARGE: {
"unit": "%",
"type": "battery",
"state_class": "measurement",
},
DataName.BATTERY_VOLTAGE: {
"unit": "V",
"type": "voltage",
"state_class": "measurement",
},
DataName.BATTERY_CURRENT: {
"unit": "A",
"type": "current",
"state_class": "measurement",
},
DataName.INTERNAL_TEMPERATURE: {
"unit": "°C",
"type": "temperature",
"state_class": "measurement",
},
"battery_temperature": {
DataName.BATTERY_TEMPERATURE: {
"unit": "°C",
"type": "temperature",
"state_class": "measurement",
},
"load_voltage": {"unit": "V", "type": "voltage", "state_class": "measurement"},
"load_current": {"unit": "A", "type": "current", "state_class": "measurement"},
"load_power": {"unit": "W", "type": "power", "state_class": "measurement"},
"panel_voltage": {"unit": "V", "type": "voltage", "state_class": "measurement"},
"panel_current": {"unit": "A", "type": "current", "state_class": "measurement"},
"panel_power": {"unit": "W", "type": "power", "state_class": "measurement"},
# "load_enabled",
DataName.LOAD_VOLTAGE: {
"unit": "V",
"type": "voltage",
"state_class": "measurement",
},
DataName.LOAD_CURRENT: {
"unit": "A",
"type": "current",
"state_class": "measurement",
},
DataName.LOAD_POWER: {"unit": "W", "type": "power", "state_class": "measurement"},
DataName.PANEL_VOLTAGE: {
"unit": "V",
"type": "voltage",
"state_class": "measurement",
},
DataName.PANEL_CURRENT: {
"unit": "A",
"type": "current",
"state_class": "measurement",
},
DataName.PANEL_POWER: {"unit": "W", "type": "power", "state_class": "measurement"},
# DataName.LOAD_ENABLED: {},
}
@ -173,12 +203,13 @@ class MqttConsumer(BaseConsumer):
for k, v in data.items():
if k in MAP_VALUES:
if k not in self.initialized:
km = MAP_VALUES[DataName(k)]
pretty_name = k.replace("_", " ").capitalize()
disc_prefix = self.settings["discovery_prefix"]
device_id = self.settings["device_id"]
self.client.publish(
f"{self.settings['discovery_prefix']}/sensor/{self.settings['device_id']}_{k}/config", # noqa: E501
payload=json.dumps(
self.get_ha_config(k, pretty_name, **MAP_VALUES[k])
),
f"{disc_prefix}/sensor/{device_id}_{k}/config",
payload=json.dumps(self.get_ha_config(k, pretty_name, **km)),
retain=True,
)
self.initialized.append(k)

View File

@ -7,6 +7,8 @@ from typing import Any, Dict
import rrdtool
from solar_types import DataName
DT_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
START = (
@ -15,39 +17,39 @@ START = (
)
# 2021-11-12 16:58:32.262030
HISTORICAL_KEYS = {
"battery_voltage_min",
"battery_voltage_max",
"charge_max_current",
"_discharge_max_current?",
"charge_max_power",
"discharge_max_power",
"charge_amp_hour",
"discharge_amp_hour",
"production_power",
"consumption_power",
"run_days",
"discharge_count",
"full_charge_count",
"total_charge_amp_hours",
"total_discharge_amp_hours",
"total_production_power",
"total_consumption_power",
DataName.BATTERY_VOLTAGE_MIN,
DataName.BATTERY_VOLTAGE_MAX,
DataName.CHARGE_MAX_CURRENT,
DataName._DISCHARGE_MAX_CURRENT,
DataName.CHARGE_MAX_POWER,
DataName.DISCHARGE_MAX_POWER,
DataName.CHARGE_AMP_HOUR,
DataName.DISCHARGE_AMP_HOUR,
DataName.PRODUCTION_ENERGY,
DataName.CONSUMPTION_ENERGY,
DataName.RUN_DAYS,
DataName.DISCHARGE_COUNT,
DataName.FULL_CHARGE_COUNT,
DataName.TOTAL_CHARGE_AMP_HOURS,
DataName.TOTAL_DISCHARGE_AMP_HOURS,
DataName.TOTAL_PRODUCTION_ENERGY,
DataName.TOTAL_CONSUMPTION_ENERGY,
}
# 2021-11-12 16:58:47.521142
INSTANT_KEYS = {
"battery_charge",
"battery_voltage",
"battery_current",
"internal_temp",
"battery_temp",
"load_voltage",
"load_current",
"load_power",
"panel_voltage",
"panel_current",
"panel_power",
"load_enabled",
DataName.BATTERY_CHARGE,
DataName.BATTERY_VOLTAGE,
DataName.BATTERY_CURRENT,
DataName.INTERNAL_TEMPERATURE,
DataName.BATTERY_TEMPERATURE,
DataName.LOAD_VOLTAGE,
DataName.LOAD_CURRENT,
DataName.LOAD_POWER,
DataName.PANEL_VOLTAGE,
DataName.PANEL_CURRENT,
DataName.PANEL_POWER,
DataName.LOAD_ENABLED,
}
KNOWN_KEYS = HISTORICAL_KEYS.union(INSTANT_KEYS)

View File

@ -6,12 +6,13 @@ import struct
import sys
import time
from io import RawIOBase
from typing import Callable, Collection, Optional, cast
from typing import Collection, Optional, cast
from bluepy import btle
from libscrc import modbus
from feasycom_ble import BTLEUart
from solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, DataItem
from test_config import get_config, get_consumers
MAC = "DC:0D:30:9C:61:BA"
@ -190,59 +191,6 @@ def humanize_number(data, unit: str = ""):
return f"{data:.3g} {prefix}{unit}"
class DataItem:
name: str
st_format: str
unit: Optional[str]
transformation: Optional[Callable]
def __init__(
self,
name: str,
st_format: str,
unit: Optional[str] = None,
transform: Optional[Callable] = None,
):
self.name = name
self.st_format = st_format
self.unit = unit
self.transformation = transform
if self.st_format[0] not in "@=<>!":
self.st_format = "!" + self.st_format
@property
def st_size(self) -> int:
return struct.calcsize(self.st_format)
def transform(self, data):
if self.transformation is None:
return data
return self.transformation(data)
def parse_temperature(bin):
if bin & 0x80:
return (bin & 0x7F) * -1
return bin & 0x7F
DATA_BATTERY_STATE = [
DataItem("battery_charge", "H", "%"),
DataItem("battery_voltage", "H", "V", lambda n: n / 10),
DataItem("battery_current", "H", "A", lambda n: n / 100),
DataItem("internal_temperature", "B", "°C", parse_temperature),
DataItem("battery_temperature", "B", "°C", parse_temperature),
DataItem("load_voltage", "H", "V", lambda n: n / 10),
DataItem("load_current", "H", "A", lambda n: n / 100),
DataItem("load_power", "H", "W"),
DataItem("panel_voltage", "H", "V", lambda n: n / 10),
DataItem("panel_current", "H", "A", lambda n: n / 100),
DataItem("panel_power", "H", "W"),
DataItem("load_enabled", "x?", transform=bool),
]
def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict:
pos = offset
res = {}
@ -259,27 +207,6 @@ def parse_battery_state(data: bytes) -> dict:
return parse(data, DATA_BATTERY_STATE)
HISTORICAL_DATA = [
DataItem("battery_voltage_min", "H", "V", lambda n: n / 10),
DataItem("battery_voltage_max", "H", "V", lambda n: n / 10),
DataItem("charge_max_current", "H", "A", lambda n: n / 100),
DataItem("_discharge_max_current?", "H", "A", lambda n: n / 100),
DataItem("charge_max_power", "H", "W"),
DataItem("discharge_max_power", "H", "W"),
DataItem("charge_amp_hour", "H", "Ah"),
DataItem("discharge_amp_hour", "H", "Ah"),
DataItem("production_power", "H", "Wh"),
DataItem("consumption_power", "H", "Wh"),
DataItem("run_days", "H"),
DataItem("discharge_count", "H"),
DataItem("full_charge_count", "H"),
DataItem("total_charge_amp_hours", "L", "Ah"),
DataItem("total_discharge_amp_hours", "L", "Ah"),
DataItem("total_production_power", "L", "Wh"),
DataItem("total_consumption_power", "L", "Wh"),
]
def parse_historical_entry(data: bytes) -> dict:
res = parse(data, HISTORICAL_DATA[:10])

114
solar_types.py Normal file
View File

@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
import struct
from enum import Enum, unique
from typing import Callable, Optional
@unique
class DataName(str, Enum):
BATTERY_CHARGE = "battery_charge"
BATTERY_VOLTAGE = "battery_voltage"
BATTERY_CURRENT = "battery_current"
INTERNAL_TEMPERATURE = "internal_temperature"
BATTERY_TEMPERATURE = "battery_temperature"
LOAD_VOLTAGE = "load_voltage"
LOAD_CURRENT = "load_current"
LOAD_POWER = "load_power"
PANEL_VOLTAGE = "panel_voltage"
PANEL_CURRENT = "panel_current"
PANEL_POWER = "panel_power"
LOAD_ENABLED = "load_enabled"
BATTERY_VOLTAGE_MIN = "battery_voltage_min"
BATTERY_VOLTAGE_MAX = "battery_voltage_max"
CHARGE_MAX_CURRENT = "charge_max_current"
_DISCHARGE_MAX_CURRENT = "_discharge_max_current?"
CHARGE_MAX_POWER = "charge_max_power"
DISCHARGE_MAX_POWER = "discharge_max_power"
CHARGE_AMP_HOUR = "charge_amp_hour"
DISCHARGE_AMP_HOUR = "discharge_amp_hour"
PRODUCTION_ENERGY = "production_energy"
CONSUMPTION_ENERGY = "consumption_energy"
RUN_DAYS = "run_days"
DISCHARGE_COUNT = "discharge_count"
FULL_CHARGE_COUNT = "full_charge_count"
TOTAL_CHARGE_AMP_HOURS = "total_charge_amp_hours"
TOTAL_DISCHARGE_AMP_HOURS = "total_discharge_amp_hours"
TOTAL_PRODUCTION_ENERGY = "total_production_energy"
TOTAL_CONSUMPTION_ENERGY = "total_consumption_energy"
def __repr__(self):
return repr(self.value)
class DataItem:
name: DataName
st_format: str
unit: Optional[str]
transformation: Optional[Callable]
def __init__(
self,
name: DataName,
st_format: str,
unit: Optional[str] = None,
transform: Optional[Callable] = None,
):
self.name = name
self.st_format = st_format
self.unit = unit
self.transformation = transform
if self.st_format[0] not in "@=<>!":
self.st_format = "!" + self.st_format
@property
def st_size(self) -> int:
return struct.calcsize(self.st_format)
def transform(self, data):
if self.transformation is None:
return data
return self.transformation(data)
def parse_temperature(bin):
if bin & 0x80:
return (bin & 0x7F) * -1
return bin & 0x7F
DATA_BATTERY_STATE = [
DataItem(DataName.BATTERY_CHARGE, "H", "%"),
DataItem(DataName.BATTERY_VOLTAGE, "H", "V", lambda n: n / 10),
DataItem(DataName.BATTERY_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.INTERNAL_TEMPERATURE, "B", "°C", parse_temperature),
DataItem(DataName.BATTERY_TEMPERATURE, "B", "°C", parse_temperature),
DataItem(DataName.LOAD_VOLTAGE, "H", "V", lambda n: n / 10),
DataItem(DataName.LOAD_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.LOAD_POWER, "H", "W"),
DataItem(DataName.PANEL_VOLTAGE, "H", "V", lambda n: n / 10),
DataItem(DataName.PANEL_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.PANEL_POWER, "H", "W"),
DataItem(DataName.LOAD_ENABLED, "x?", transform=bool),
]
HISTORICAL_DATA = [
DataItem(DataName.BATTERY_VOLTAGE_MIN, "H", "V", lambda n: n / 10),
DataItem(DataName.BATTERY_VOLTAGE_MAX, "H", "V", lambda n: n / 10),
DataItem(DataName.CHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName._DISCHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.CHARGE_MAX_POWER, "H", "W"),
DataItem(DataName.DISCHARGE_MAX_POWER, "H", "W"),
DataItem(DataName.CHARGE_AMP_HOUR, "H", "Ah"),
DataItem(DataName.DISCHARGE_AMP_HOUR, "H", "Ah"),
DataItem(DataName.PRODUCTION_ENERGY, "H", "Wh"),
DataItem(DataName.CONSUMPTION_ENERGY, "H", "Wh"),
DataItem(DataName.RUN_DAYS, "H"),
DataItem(DataName.DISCHARGE_COUNT, "H"),
DataItem(DataName.FULL_CHARGE_COUNT, "H"),
DataItem(DataName.TOTAL_CHARGE_AMP_HOURS, "L", "Ah"),
DataItem(DataName.TOTAL_DISCHARGE_AMP_HOURS, "L", "Ah"),
DataItem(DataName.TOTAL_PRODUCTION_ENERGY, "L", "Wh"),
DataItem(DataName.TOTAL_CONSUMPTION_ENERGY, "L", "Wh"),
]