Refractor some. Add parsing of historical data.

This commit is contained in:
Odd Stråbø 2021-11-08 04:29:43 +01:00
parent 2cef664c7f
commit 5e87e3edd6
1 changed files with 95 additions and 38 deletions

View File

@ -6,6 +6,7 @@ import struct
import sys import sys
import time import time
from io import RawIOBase from io import RawIOBase
from typing import Optional, cast
from bluepy import btle from bluepy import btle
from libscrc import modbus from libscrc import modbus
@ -13,7 +14,6 @@ from libscrc import modbus
from feasycom_ble import BTLEUart from feasycom_ble import BTLEUart
MAC = "DC:0D:30:9C:61:BA" MAC = "DC:0D:30:9C:61:BA"
INTERVAL = 15
# write_service = "0000ffd0-0000-1000-8000-00805f9b34fb" # write_service = "0000ffd0-0000-1000-8000-00805f9b34fb"
# read_service = "0000fff0-0000-1000-8000-00805f9b34fb" # read_service = "0000fff0-0000-1000-8000-00805f9b34fb"
@ -158,8 +158,6 @@ CMD_ = b"\xff\x78\x00\x00\x00\x01"
# 3: Panel power: 4 W # 3: Panel power: 4 W
# ?: load_enabled # ?: load_enabled
STATUS = {}
def parse_temperature(bin): def parse_temperature(bin):
if bin & 0x80: if bin & 0x80:
@ -172,16 +170,20 @@ def parse_battery_state(data: bytes) -> dict:
res = dict( res = dict(
zip( zip(
( (
"battery_charge", "battery_charge", # %
"battery_voltage", "battery_voltage", # V
"battery_current", "battery_current", # A
"_internal_temperature?", "_internal_temperature?", # °C
"battery_temperature", "battery_temperature", # °C
"load_voltage", "load_voltage", # V
"load_current", "load_current", # A
"load_power", "load_power", # W
"panel_voltage", # V
"panel_current", # A
"panel_power", # W
"load_enabled", # bool
), ),
struct.unpack("!HHHBBHHH", data), struct.unpack("!HHHBBHHHHHHx?", data),
) )
) )
res["battery_voltage"] /= 10 res["battery_voltage"] /= 10
@ -190,29 +192,51 @@ def parse_battery_state(data: bytes) -> dict:
res["load_current"] /= 100 res["load_current"] /= 100
res["_internal_temperature?"] = parse_temperature(res["_internal_temperature?"]) res["_internal_temperature?"] = parse_temperature(res["_internal_temperature?"])
res["battery_temperature"] = parse_temperature(res["battery_temperature"]) res["battery_temperature"] = parse_temperature(res["battery_temperature"])
STATUS.update(res) res["panel_voltage"] /= 10
res["panel_current"] /= 100
log(str(res))
return res return res
# GET_PANEL_STATUS def parse_historical_entry(data: bytes) -> dict:
def parse_panel_status(data: bytes) -> dict:
res = dict( res = dict(
zip( zip(
("panel_voltage", "panel_current", "panel_power", "load_enabled"), (
struct.unpack("!HHHx?", data), "battery_voltage_min", # V
"battery_voltage_max", # V
"unknown1",
"unknown2",
"charge_max_power",
"discharge_max_power",
"charge_amp_hour",
"discharge_amp_hour",
"production_power",
"consumption_power",
),
struct.unpack_from("!10H", data),
) )
) )
res["panel_voltage"] /= 10 res["battery_voltage_min"] /= 10
res["panel_current"] /= 100 res["battery_voltage_max"] /= 10
STATUS.update(res)
# elif operation == 6 and cc == 1: if len(data) > 20:
# res = dict(zip(("load_enabled",), struct.unpack("!xxxxx?xx", data))) res.update(
# STATUS.update(res) dict(
zip(
(
"run_days",
"discharge_count",
"full_charge_count",
"total_charge_amp_hours",
"total_discharge_amp_hours",
"total_production_power",
"total_consumption_power",
),
struct.unpack_from("!3H4L", data, offset=20),
),
)
)
log(str(res))
return res return res
@ -228,8 +252,8 @@ def construct_request(address, words=1, action=ACTION_READ, marker=0xFF):
return struct.pack("!BBHH", marker, action, address, words) return struct.pack("!BBHH", marker, action, address, words)
def log(string: str): def log(message: object):
print(datetime.datetime.utcnow().isoformat(" "), string) print(datetime.datetime.utcnow().isoformat(" "), message)
sys.stdout.flush() sys.stdout.flush()
@ -268,9 +292,28 @@ def readMemory(fh: RawIOBase, address: int, words: int = 1):
return data return data
class Periodical:
prev: float
interval: float
def __init__(self, interval: float, start: Optional[float] = None):
self.prev = time.time() - interval if start is None else start
self.interval = interval
def __call__(self, now: Optional[float] = None) -> bool:
if now is None:
now = time.time()
if (now - self.prev) >= self.interval:
self.prev += self.interval
return True
return False
if __name__ == "__main__": if __name__ == "__main__":
prev = time.time() - INTERVAL per_voltages = Periodical(interval=15)
per_current_hist = Periodical(interval=60)
while True: while True:
try: try:
@ -284,21 +327,33 @@ if __name__ == "__main__":
# for address in range(0, 0x10000, 16): # for address in range(0, 0x10000, 16):
# log(f"Reading 0x{address:04X}...") # log(f"Reading 0x{address:04X}...")
# write(wd, construct_request(address, 16)) # write(wd, construct_request(address, 16))
days = 7
res = readMemory(dev, 0x010B, 21)
if res:
d = parse_historical_entry(res)
log(d)
days = cast(int, d.get("run_days", 7))
for i in range(days):
res = readMemory(dev, 0xF000 + i, 10)
if res:
d = parse_historical_entry(res)
log({i: d})
while True: while True:
now = time.time() now = time.time()
diff = now - prev if per_voltages(now):
if diff >= INTERVAL: # CMD_GET_BATTERY_STATE + CMD_GET_PANEL_STATUS
prev += INTERVAL res = readMemory(dev, 0x0100, 11)
res = readMemory(dev, 0x0107, 4) # CMD_GET_PANEL_STATUS
if res: if res:
parse_panel_status(res) d = parse_battery_state(res)
log(d)
res = readMemory(dev, 0x0100, 7) # CMD_GET_BATTERY_STATE if per_current_hist(now):
res = readMemory(dev, 0x010B, 21)
if res: if res:
parse_battery_state(res) d = parse_historical_entry(res)
log(d)
# print(".")
time.sleep(1) time.sleep(1)
# if STATUS.get('load_enabled'): # if STATUS.get('load_enabled'):
@ -309,3 +364,5 @@ if __name__ == "__main__":
except btle.BTLEDisconnectError: except btle.BTLEDisconnectError:
log("ERROR: Disconnected") log("ERROR: Disconnected")
time.sleep(1) time.sleep(1)
except KeyboardInterrupt:
break