Refractor data parsing functions

This commit is contained in:
Odd Stråbø 2021-11-08 21:41:47 +01:00
parent b168b5120f
commit 4266893111

View file

@ -6,7 +6,7 @@ import struct
import sys
import time
from io import RawIOBase
from typing import Optional, cast
from typing import Callable, Collection, Optional, cast
from bluepy import btle
from libscrc import modbus
@ -159,85 +159,104 @@ CMD_ = b"\xff\x78\x00\x00\x00\x01"
# ?: load_enabled
class DataItem:
name: str
st_format: str
unit: Optional[str]
transformation: Optional[Callable]
def __init__(
self,
name: str,
st_format: str,
unit: Optional[str] = None,
transform: Optional[Callable] = None,
):
self.name = name
self.st_format = st_format
self.unit = unit
self.transformation = transform
if self.st_format[0] not in "@=<>!":
self.st_format = "!" + self.st_format
@property
def st_size(self) -> int:
return struct.calcsize(self.st_format)
def transform(self, data):
if self.transformation is None:
return data
return self.transformation(data)
def parse_temperature(bin):
if bin & 0x80:
return (bin & 0x7F) * -1
return bin & 0x7F
# GET_BATTERY_STATE
def parse_battery_state(data: bytes) -> dict:
res = dict(
zip(
(
"battery_charge", # %
"battery_voltage", # V
"battery_current", # A
"internal_temperature", # °C
"battery_temperature", # °C
"load_voltage", # V
"load_current", # A
"load_power", # W
"panel_voltage", # V
"panel_current", # A
"panel_power", # W
"load_enabled", # bool
),
struct.unpack("!HHHBBHHHHHHx?", data),
)
)
res["battery_voltage"] /= 10
res["battery_current"] /= 100
res["load_voltage"] /= 10
res["load_current"] /= 100
res["internal_temperature"] = parse_temperature(res["internal_temperature"])
res["battery_temperature"] = parse_temperature(res["battery_temperature"])
res["panel_voltage"] /= 10
res["panel_current"] /= 100
DATA_BATTERY_STATE = [
DataItem("battery_charge", "H", "%"),
DataItem("battery_voltage", "H", "V", lambda n: n / 10),
DataItem("battery_current", "H", "A", lambda n: n / 100),
DataItem("internal_temperature", "B", "°C", parse_temperature),
DataItem("battery_temperature", "B", "°C", parse_temperature),
DataItem("load_voltage", "H", "V", lambda n: n / 10),
DataItem("load_current", "H", "A", lambda n: n / 100),
DataItem("load_power", "H", "W"),
DataItem("panel_voltage", "H", "V", lambda n: n / 10),
DataItem("panel_current", "H", "A", lambda n: n / 100),
DataItem("panel_power", "H", "W"),
DataItem("load_enabled", "x?", transform=bool),
]
def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict:
pos = offset
res = {}
for i in items:
res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0])
pos += i.st_size
return res
def parse_historical_entry(data: bytes) -> dict:
res = dict(
zip(
(
"battery_voltage_min", # V
"battery_voltage_max", # V
"charge_max_current", # A
"_discharge_max_current?", # A
"charge_max_power", # W
"discharge_max_power", # W
"charge_amp_hour", # Ah
"discharge_amp_hour", # Ah
"production_power", # Wh
"consumption_power", # Wh
),
struct.unpack_from("!10H", data),
)
)
res["battery_voltage_min"] /= 10
res["battery_voltage_max"] /= 10
res["charge_max_current"] /= 100
res["_discharge_max_current?"] /= 100
# GET_BATTERY_STATE
def parse_battery_state(data: bytes) -> dict:
return parse(data, DATA_BATTERY_STATE)
if len(data) > 20:
res.update(
dict(
zip(
(
"run_days",
"discharge_count",
"full_charge_count",
"total_charge_amp_hours", # Ah
"total_discharge_amp_hours", # Ah
"total_production_power", # Wh
"total_consumption_power", # Wh
),
struct.unpack_from("!3H4L", data, offset=20),
),
)
)
HISTORICAL_DATA = [
DataItem("battery_voltage_min", "H", "V", lambda n: n / 10),
DataItem("battery_voltage_max", "H", "V", lambda n: n / 10),
DataItem("charge_max_current", "H", "A", lambda n: n / 100),
DataItem("_discharge_max_current?", "H", "A", lambda n: n / 100),
DataItem("charge_max_power", "H", "W"),
DataItem("discharge_max_power", "H", "W"),
DataItem("charge_amp_hour", "H", "Ah"),
DataItem("discharge_amp_hour", "H", "Ah"),
DataItem("production_power", "H", "Wh"),
DataItem("consumption_power", "H", "Wh"),
DataItem("run_days", "H"),
DataItem("discharge_count", "H"),
DataItem("full_charge_count", "H"),
DataItem("total_charge_amp_hours", "L", "Ah"),
DataItem("total_discharge_amp_hours", "L", "Ah"),
DataItem("total_production_power", "L", "Wh"),
DataItem("total_consumption_power", "L", "Wh"),
]
def parse_historical_entry(data: bytes) -> dict:
res = parse(data, HISTORICAL_DATA[:10])
res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]])
if len(data) > res_datalen:
res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen))
return res
@ -325,7 +344,7 @@ if __name__ == "__main__":
while True:
try:
log("Connecting...")
with BTLEUart(MAC, timeout=30) as dev:
with BTLEUart(MAC, timeout=10) as dev:
log("Connected.")
# write(dev, construct_request(0, 32))