From 4266893111f608ac083874fb9193db9a7d75ff98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Odd=20Str=C3=A5b=C3=B8?= Date: Mon, 8 Nov 2021 21:41:47 +0100 Subject: [PATCH] Refractor data parsing functions --- solar_ble.py | 159 ++++++++++++++++++++++++++++----------------------- 1 file changed, 89 insertions(+), 70 deletions(-) diff --git a/solar_ble.py b/solar_ble.py index 683c138..1baafe0 100755 --- a/solar_ble.py +++ b/solar_ble.py @@ -6,7 +6,7 @@ import struct import sys import time from io import RawIOBase -from typing import Optional, cast +from typing import Callable, Collection, Optional, cast from bluepy import btle from libscrc import modbus @@ -159,85 +159,104 @@ CMD_ = b"\xff\x78\x00\x00\x00\x01" # ?: load_enabled +class DataItem: + name: str + st_format: str + unit: Optional[str] + transformation: Optional[Callable] + + def __init__( + self, + name: str, + st_format: str, + unit: Optional[str] = None, + transform: Optional[Callable] = None, + ): + self.name = name + self.st_format = st_format + self.unit = unit + self.transformation = transform + + if self.st_format[0] not in "@=<>!": + self.st_format = "!" + self.st_format + + @property + def st_size(self) -> int: + return struct.calcsize(self.st_format) + + def transform(self, data): + if self.transformation is None: + return data + + return self.transformation(data) + + def parse_temperature(bin): if bin & 0x80: return (bin & 0x7F) * -1 return bin & 0x7F -# GET_BATTERY_STATE -def parse_battery_state(data: bytes) -> dict: - res = dict( - zip( - ( - "battery_charge", # % - "battery_voltage", # V - "battery_current", # A - "internal_temperature", # °C - "battery_temperature", # °C - "load_voltage", # V - "load_current", # A - "load_power", # W - "panel_voltage", # V - "panel_current", # A - "panel_power", # W - "load_enabled", # bool - ), - struct.unpack("!HHHBBHHHHHHx?", data), - ) - ) - res["battery_voltage"] /= 10 - res["battery_current"] /= 100 - res["load_voltage"] /= 10 - res["load_current"] /= 100 - res["internal_temperature"] = parse_temperature(res["internal_temperature"]) - res["battery_temperature"] = parse_temperature(res["battery_temperature"]) - res["panel_voltage"] /= 10 - res["panel_current"] /= 100 +DATA_BATTERY_STATE = [ + DataItem("battery_charge", "H", "%"), + DataItem("battery_voltage", "H", "V", lambda n: n / 10), + DataItem("battery_current", "H", "A", lambda n: n / 100), + DataItem("internal_temperature", "B", "°C", parse_temperature), + DataItem("battery_temperature", "B", "°C", parse_temperature), + DataItem("load_voltage", "H", "V", lambda n: n / 10), + DataItem("load_current", "H", "A", lambda n: n / 100), + DataItem("load_power", "H", "W"), + DataItem("panel_voltage", "H", "V", lambda n: n / 10), + DataItem("panel_current", "H", "A", lambda n: n / 100), + DataItem("panel_power", "H", "W"), + DataItem("load_enabled", "x?", transform=bool), +] + + +def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict: + pos = offset + res = {} + + for i in items: + res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0]) + pos += i.st_size return res -def parse_historical_entry(data: bytes) -> dict: - res = dict( - zip( - ( - "battery_voltage_min", # V - "battery_voltage_max", # V - "charge_max_current", # A - "_discharge_max_current?", # A - "charge_max_power", # W - "discharge_max_power", # W - "charge_amp_hour", # Ah - "discharge_amp_hour", # Ah - "production_power", # Wh - "consumption_power", # Wh - ), - struct.unpack_from("!10H", data), - ) - ) - res["battery_voltage_min"] /= 10 - res["battery_voltage_max"] /= 10 - res["charge_max_current"] /= 100 - res["_discharge_max_current?"] /= 100 +# GET_BATTERY_STATE +def parse_battery_state(data: bytes) -> dict: + return parse(data, DATA_BATTERY_STATE) - if len(data) > 20: - res.update( - dict( - zip( - ( - "run_days", - "discharge_count", - "full_charge_count", - "total_charge_amp_hours", # Ah - "total_discharge_amp_hours", # Ah - "total_production_power", # Wh - "total_consumption_power", # Wh - ), - struct.unpack_from("!3H4L", data, offset=20), - ), - ) - ) + +HISTORICAL_DATA = [ + DataItem("battery_voltage_min", "H", "V", lambda n: n / 10), + DataItem("battery_voltage_max", "H", "V", lambda n: n / 10), + DataItem("charge_max_current", "H", "A", lambda n: n / 100), + DataItem("_discharge_max_current?", "H", "A", lambda n: n / 100), + DataItem("charge_max_power", "H", "W"), + DataItem("discharge_max_power", "H", "W"), + DataItem("charge_amp_hour", "H", "Ah"), + DataItem("discharge_amp_hour", "H", "Ah"), + DataItem("production_power", "H", "Wh"), + DataItem("consumption_power", "H", "Wh"), + DataItem("run_days", "H"), + DataItem("discharge_count", "H"), + DataItem("full_charge_count", "H"), + DataItem("total_charge_amp_hours", "L", "Ah"), + DataItem("total_discharge_amp_hours", "L", "Ah"), + DataItem("total_production_power", "L", "Wh"), + DataItem("total_consumption_power", "L", "Wh"), +] + + +def parse_historical_entry(data: bytes) -> dict: + res = parse(data, HISTORICAL_DATA[:10]) + + res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]]) + + if len(data) > res_datalen: + res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen)) return res @@ -325,7 +344,7 @@ if __name__ == "__main__": while True: try: log("Connecting...") - with BTLEUart(MAC, timeout=30) as dev: + with BTLEUart(MAC, timeout=10) as dev: log("Connected.") # write(dev, construct_request(0, 32))