srne-mqtt/solar_ble.py

452 lines
14 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import struct
import sys
import time
from decimal import Decimal
from io import RawIOBase
from typing import Callable, Collection, Optional, cast
from bluepy import btle
from libscrc import modbus
from feasycom_ble import BTLEUart
from solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, DataItem, DataName
from test_config import get_config, get_consumers
MAC = "DC:0D:30:9C:61:BA"
# write_service = "0000ffd0-0000-1000-8000-00805f9b34fb"
# read_service = "0000fff0-0000-1000-8000-00805f9b34fb"
ACTION_READ = 0x03
ACTION_WRITE = 0x03
POSSIBLE_MARKER = (0x01, 0xFD, 0xFE, 0xFF)
# get(255, 12, 2)
# "ff 03 00 0c 00 02"
CMD_GET_1 = b"\xff\x03\x00\x0c\x00\x02"
# > ff 03 04 20 20 20 20
# get(255, 12, 8)
# ff 03 00 0c 00 08
CMD_GET_MODEL = b"\xff\x03\x00\x0c\x00\x08"
# > ff 03 10 20 20 20 20 4d 4c 32 34 32 30 20 20 20 20 20 20
# Device SKU: ML2420
# get(255, 20, 4)
# ff 03 00 14 00 04
CMD_GET_VERSION = b"\xff\x03\x00\x14\x00\x04"
# > ff 03 08 00 04 02 00 02 00 00 03
# CC ?? 11 22 33 ?? 44 55 66
# Version: 4.2.0
# get(255, 24, 3)
# ff 03 00 18 00 03
CMD_GET_SERIAL = b"\xff\x03\x00\x18\x00\x03"
# > ff 03 06 3c 13 02 67 00 01
# CC 11 22 33 33 ?? ??
# SN: 60-19-0615
# get(255, 256, 7)
# ff 03 01 00 00 07
CMD_GET_BATTERY_STATE = b"\xff\x03\x01\x00\x00\x07"
# > ff 03 0e 00 48 00 7e 00 1d 0e 0d 00 7e 00 1c 00 03
# CC 11 11 22 22 33 33 44 55 66 66 77 77 88 88
# 1: Battery charge: 72 %
# 2: Battery voltage: 12.6 V
# 3: Battery current: 0.29 A
# 4: Internal temperature?
# 5: External temperature probe for battery signet 8bit: 13 degC
# 6: Load voltage: 12.6 V
# 7: Load current: 0.28 A
# 8: Load power: 3 W
# get(255, 263, 4)
# ff 03 01 07 00 04
CMD_GET_PANEL_STATUS = b"\xff\x03\x01\x07\x00\x04"
# > ff 03 08 00 c8 00 14 00 04 00 01
# CC 11 11 22 22 33 33 ?? ??
# 1: Panel voltage: 20.0 V
# 2: Panel current: 0.20 A
# 3: Panel power: 4 W
# Charging status?
# set(255, 266, 1 or 0)
# ff 06 01 0a 00 01
CMD_ENABLE_LOAD = b"\xff\x06\x01\x0a\x00\x01"
CMD_DISABLE_LOAD = b"\xff\x06\x01\x0a\x00\x00"
REG_LOAD_ENABLE = 0x010A
# get(255, 267, 21)
# ff 03 01 0b 00 15
CMD_GET_LOAD_PARAMETERS = b"\xff\x03\x01\x0b\x00\x15"
# > ff 03 2a 00 7c 00 7f 00 51 00 20 00 0a 00 03 00 00 00 00 00
# > 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
# > 00 00 00 00 00
# get(255, 288, 3)
# ff 03 01 20 00 03
CMD_GET_2 = b"\xff\x03\x01\x20\x00\x03"
# > ff 03 06 80 02 00 00 00 00
# CC 11 22 33 33 33 33
# 1: boolean flag?: 1
# 2: ?: 2
# 3: ?: 0
# get(255, 57345, 33)
# ff 03 e0 01 00 21
CMD_GET_BATTERY_PARAMETERS = b"\xff\x03\xe0\x01\x00\x21"
# > ff 03 42 07 d0 00 c8 ff 0c 00 02 00 a0 00 9b 00 92 00 90 00
# > 8a 00 84 00 7e 00 78 00 6f 00 6a 64 32 00 05 00 78 00 78 00
# > 1e 00 03 00 41 00 a3 00 4b 00 a3 00 00 00 00 00 00 00 00 00
# > 0f 00 05 00 05 00 04 01 00
# 33 * uint16
# get(1, 61440, 10)
# 01 03 f0 00 00 0a
CMD_GET_HISTORICAL_TODAY = b"\x01\x03\xf0\x00\x00\x0a"
CMD_GET_HISTORICAL_YESTERDAY = b"\x01\x03\xf0\x01\x00\x0a"
CMD_GET_HISTORICAL_D2 = b"\x01\x03\xf0\x02\x00\x0a"
CMD_GET_HISTORICAL_D3 = b"\x01\x03\xf0\x03\x00\x0a"
# ,- battery_min_voltage
# | ,- battery_max_voltage
# | | ,- ?1 max charge %?
# | | | ,- ?2
# | | | | ,- charge_max_power
# | | | | | ,- discharge_max_power
# | | | | | | ,- charge_amp_hour
# | | | | | | | ,- discharge_amp_hour
# | | | | | | | | ,- production_power
# | | | | | | | | | ,- consumption_power
# _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___
# > 01 03 14 00 7c 00 7f 00 51 00 20 00 0a 00 03 00 00 00 00 00 00 00 00
# > 01 03 14 00 7c 00 7f 00 53 00 20 00 0a 00 03 00 00 00 00 00 00 00 00
# battery_min_voltage = 12.4 V
# battery_max_voltage = 12.7 V
# ?1 = 83 % ?
# ?2 =
# charge_max_power = 10 W
# discharge_max_power = 3 W
# charge_amp_hour = 0 Ah
# discharge_amp_hour = 0 Ah
# production_power = 0 Wh
# consumption_power = 0 Wh
# ff 78 00 00 00 01
CMD_ = b"\xff\x78\x00\x00\x00\x01"
# CMD_GET_BATTERY_STATE = b'\xff\x03\x01\x00\x00\x07'
# > ff 03 0e 00 48 00 7e 00 1d 0e 0d 00 7e 00 1c 00 03
# CC 11 11 22 22 33 33 44 55 66 66 77 77 88 88
# 1: Battery charge: 72 %
# 2: Battery voltage: 12.6 V
# 3: Battery current: 0.29 A
# 4: Internal temperature?
# 5: External temperature probe for battery signed 8bit: 13 degC
# 6: Load voltage: 12.6 V
# 7: Load current: 0.28 A
# 8: Load power: 3 W
# CMD_GET_PANEL_STATUS = b'\xff\x03\x01\x07\x00\x04'
# > ff 03 08 00 c8 00 14 00 04 00 01
# CC 11 11 22 22 33 33 ?? ??
# > ff 03 08 00 00 00 00 00 00 00 00
# 1: Panel voltage: 20.0 V
# 2: Panel current: 0.20 A
# 3: Panel power: 4 W
# ?: load_enabled
# Only factor of 1000
SI_PREFIXES_LARGE = "kMGTPEZY"
SI_PREFIXES_SMALL = "mµnpfazy"
def humanize_number(data, unit: str = ""):
counter = 0
while data >= 1000:
data /= 1000
counter += 1
if counter >= len(SI_PREFIXES_LARGE):
break
while data < 1:
data *= 1000
counter -= 1
if abs(counter) >= len(SI_PREFIXES_SMALL):
break
if not counter:
prefix = ""
elif counter > 0:
prefix = SI_PREFIXES_LARGE[counter - 1]
elif counter < 0:
prefix = SI_PREFIXES_SMALL[abs(counter) - 1]
return f"{data:.3g} {prefix}{unit}"
def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict:
pos = offset
res = {}
for i in items:
res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0])
pos += i.st_size
return res
# GET_BATTERY_STATE
def parse_battery_state(data: bytes) -> dict:
return parse(data, DATA_BATTERY_STATE)
def parse_historical_entry(data: bytes) -> dict:
res = parse(data, HISTORICAL_DATA[:10])
res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]])
if len(data) > res_datalen:
res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen))
return res
def write(fh, data):
bdata = bytes(data)
crc = modbus(bdata)
bcrc = bytes([crc & 0xFF, (crc & 0xFF00) >> 8])
fh.write(data + bcrc)
def construct_request(address, words=1, action=ACTION_READ, marker=0xFF):
assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}"
return struct.pack("!BBHH", marker, action, address, words)
def log(*message: object, **kwargs):
print(datetime.datetime.utcnow().isoformat(" "), *message, **kwargs)
sys.stdout.flush()
def parse_packet(data):
tag, operation, size = struct.unpack_from("BBB", data)
_unpacked = struct.unpack_from(f"<{size}BH", data, offset=3)
crc = _unpacked[-1]
payload = _unpacked[:-1]
calculated_crc = modbus(bytes([tag, operation, size, *payload]))
if crc != calculated_crc:
e = ValueError(f"CRC missmatch: expected {crc:04X}, got {calculated_crc:04X}.")
e.tag = tag
e.operation = operation
e.size = size
e.payload = payload
e.crc = crc
e.calculated_crc = calculated_crc
raise e
return payload
def discardUntil(fh: RawIOBase, byte: int, timeout=10) -> Optional[int]:
assert byte >= 0 and byte < 256, f"byte: Expected 8bit unsigned int, got {byte}"
def expand(b: Optional[bytes]):
if b is None:
return b
return b[0]
start = time.time()
discarded = 0
read_byte = expand(fh.read(1))
while read_byte != byte:
if read_byte is not None:
if not discarded:
log("Discarding", end="")
discarded += 1
print(f" {read_byte:02X}", end="")
sys.stdout.flush()
if time.time() - start > timeout:
read_byte = None
break
read_byte = expand(fh.read(1))
if discarded:
print()
sys.stdout.flush()
return read_byte
def readMemory(fh: RawIOBase, address: int, words: int = 1) -> Optional[bytes]:
# log(f"Reading {words} words from 0x{address:04X}")
request = construct_request(address, words=words)
# log("Request:", request)
write(fh, request)
tag = discardUntil(fh, 0xFF)
if tag is None:
return None
header = fh.read(2)
if header and len(header) == 2:
operation, size = header
data = fh.read(size)
_crc = fh.read(2)
if data and _crc:
crc = struct.unpack_from("<H", _crc)[0]
calculated_crc = modbus(bytes([tag, operation, size, *data]))
if crc == calculated_crc:
return data
else:
log(f"readMemory: CRC error; {crc:04X} != {calculated_crc:04X}")
log("data or crc is falsely", header, data, _crc)
return None
class Periodical:
prev: float
interval: float
def __init__(self, interval: float, start: Optional[float] = None):
self.prev = time.time() - interval if start is None else start
self.interval = interval
def __call__(self, now: Optional[float] = None) -> bool:
if now is None:
now = time.time()
if (now - self.prev) >= self.interval:
skipped, overshoot = divmod(now - self.prev, self.interval)
skipped -= 1
if skipped:
log("Skipped:", skipped, overshoot, now - self.prev, self.interval)
self.prev = now - overshoot
return True
return False
def try_read_parse(
dev: BTLEUart,
address: int,
words: int = 1,
parser: Callable = None,
attempts=5,
) -> Optional[dict]:
while attempts:
attempts -= 1
res = readMemory(dev, address, words)
if res:
try:
if parser:
return parser(res)
except struct.error as e:
log(e)
log("0x0100 Unpack error:", len(res), res)
log("Flushed from read buffer; ", dev.read(timeout=0.5))
else:
log(f"No data read, expected {words*2} bytes (attempts left: {attempts})")
return None
if __name__ == "__main__":
conf = get_config()
consumers = get_consumers(conf)
per_voltages = Periodical(interval=15)
per_current_hist = Periodical(interval=60)
try:
while True:
try:
log("Connecting...")
with BTLEUart(MAC, timeout=5) as dev:
log("Connected.")
# write(dev, construct_request(0, 32))
# Memory dump
# for address in range(0, 0x10000, 16):
# log(f"Reading 0x{address:04X}...")
# write(wd, construct_request(address, 16))
days = 7
res = try_read_parse(dev, 0x010B, 21, parse_historical_entry)
if res:
log(res)
for consumer in consumers:
consumer.write(res)
days = cast(int, res.get("run_days", 7))
for i in range(days):
res = try_read_parse(
dev, 0xF000 + i, 10, parse_historical_entry
)
if res:
log({i: res})
for consumer in consumers:
consumer.write({str(i): res})
while True:
now = time.time()
if per_voltages(now):
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
if data:
data[DataName.CALCULATED_BATTERY_POWER] = float(
Decimal(data.get(DataName.BATTERY_VOLTAGE, 0))
* Decimal(data.get(DataName.BATTERY_CURRENT, 0))
)
data[DataName.CALCULATED_PANEL_POWER] = float(
Decimal(data.get(DataName.PANEL_VOLTAGE, 0))
* Decimal(data.get(DataName.PANEL_CURRENT, 0))
)
data[DataName.CALCULATED_LOAD_POWER] = float(
Decimal(data.get(DataName.LOAD_VOLTAGE, 0))
* Decimal(data.get(DataName.LOAD_CURRENT, 0))
)
log(data)
for consumer in consumers:
consumer.write(data)
if per_current_hist(now):
data = try_read_parse(
dev, 0x010B, 21, parse_historical_entry
)
if data:
log(data)
for consumer in consumers:
consumer.write(data)
# print(".")
for consumer in consumers:
consumer.poll()
time.sleep(max(0, 1 - time.time() - now))
# if STATUS.get('load_enabled'):
# write(wd, CMD_DISABLE_LOAD)
# else:
# write(wd, CMD_ENABLE_LOAD)
except btle.BTLEDisconnectError:
log("ERROR: Disconnected")
time.sleep(1)
except (KeyboardInterrupt, SystemExit, Exception) as e:
for consumer in consumers:
consumer.exit()
if type(e) is not KeyboardInterrupt:
raise