#!/usr/bin/env python3 # -*- coding: utf-8 -*- import datetime import struct import sys import time from io import RawIOBase from typing import Callable, Collection, Optional, cast from bluepy import btle from libscrc import modbus from feasycom_ble import BTLEUart from test_config import get_config, get_consumers MAC = "DC:0D:30:9C:61:BA" # write_service = "0000ffd0-0000-1000-8000-00805f9b34fb" # read_service = "0000fff0-0000-1000-8000-00805f9b34fb" ACTION_READ = 0x03 ACTION_WRITE = 0x03 POSSIBLE_MARKER = (0x01, 0xFD, 0xFE, 0xFF) # get(255, 12, 2) # "ff 03 00 0c 00 02" CMD_GET_1 = b"\xff\x03\x00\x0c\x00\x02" # > ff 03 04 20 20 20 20 # get(255, 12, 8) # ff 03 00 0c 00 08 CMD_GET_MODEL = b"\xff\x03\x00\x0c\x00\x08" # > ff 03 10 20 20 20 20 4d 4c 32 34 32 30 20 20 20 20 20 20 # Device SKU: ML2420 # get(255, 20, 4) # ff 03 00 14 00 04 CMD_GET_VERSION = b"\xff\x03\x00\x14\x00\x04" # > ff 03 08 00 04 02 00 02 00 00 03 # CC ?? 11 22 33 ?? 44 55 66 # Version: 4.2.0 # get(255, 24, 3) # ff 03 00 18 00 03 CMD_GET_SERIAL = b"\xff\x03\x00\x18\x00\x03" # > ff 03 06 3c 13 02 67 00 01 # CC 11 22 33 33 ?? ?? # SN: 60-19-0615 # get(255, 256, 7) # ff 03 01 00 00 07 CMD_GET_BATTERY_STATE = b"\xff\x03\x01\x00\x00\x07" # > ff 03 0e 00 48 00 7e 00 1d 0e 0d 00 7e 00 1c 00 03 # CC 11 11 22 22 33 33 44 55 66 66 77 77 88 88 # 1: Battery charge: 72 % # 2: Battery voltage: 12.6 V # 3: Battery current: 0.29 A # 4: Internal temperature? # 5: External temperature probe for battery signet 8bit: 13 degC # 6: Load voltage: 12.6 V # 7: Load current: 0.28 A # 8: Load power: 3 W # get(255, 263, 4) # ff 03 01 07 00 04 CMD_GET_PANEL_STATUS = b"\xff\x03\x01\x07\x00\x04" # > ff 03 08 00 c8 00 14 00 04 00 01 # CC 11 11 22 22 33 33 ?? ?? # 1: Panel voltage: 20.0 V # 2: Panel current: 0.20 A # 3: Panel power: 4 W # Charging status? # set(255, 266, 1 or 0) # ff 06 01 0a 00 01 CMD_ENABLE_LOAD = b"\xff\x06\x01\x0a\x00\x01" CMD_DISABLE_LOAD = b"\xff\x06\x01\x0a\x00\x00" REG_LOAD_ENABLE = 0x010A # get(255, 267, 21) # ff 03 01 0b 00 15 CMD_GET_LOAD_PARAMETERS = b"\xff\x03\x01\x0b\x00\x15" # > ff 03 2a 00 7c 00 7f 00 51 00 20 00 0a 00 03 00 00 00 00 00 # > 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 # > 00 00 00 00 00 # get(255, 288, 3) # ff 03 01 20 00 03 CMD_GET_2 = b"\xff\x03\x01\x20\x00\x03" # > ff 03 06 80 02 00 00 00 00 # CC 11 22 33 33 33 33 # 1: boolean flag?: 1 # 2: ?: 2 # 3: ?: 0 # get(255, 57345, 33) # ff 03 e0 01 00 21 CMD_GET_BATTERY_PARAMETERS = b"\xff\x03\xe0\x01\x00\x21" # > ff 03 42 07 d0 00 c8 ff 0c 00 02 00 a0 00 9b 00 92 00 90 00 # > 8a 00 84 00 7e 00 78 00 6f 00 6a 64 32 00 05 00 78 00 78 00 # > 1e 00 03 00 41 00 a3 00 4b 00 a3 00 00 00 00 00 00 00 00 00 # > 0f 00 05 00 05 00 04 01 00 # 33 * uint16 # get(1, 61440, 10) # 01 03 f0 00 00 0a CMD_GET_HISTORICAL_TODAY = b"\x01\x03\xf0\x00\x00\x0a" CMD_GET_HISTORICAL_YESTERDAY = b"\x01\x03\xf0\x01\x00\x0a" CMD_GET_HISTORICAL_D2 = b"\x01\x03\xf0\x02\x00\x0a" CMD_GET_HISTORICAL_D3 = b"\x01\x03\xf0\x03\x00\x0a" # ,- battery_min_voltage # | ,- battery_max_voltage # | | ,- ?1 max charge %? # | | | ,- ?2 # | | | | ,- charge_max_power # | | | | | ,- discharge_max_power # | | | | | | ,- charge_amp_hour # | | | | | | | ,- discharge_amp_hour # | | | | | | | | ,- production_power # | | | | | | | | | ,- consumption_power # _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ # > 01 03 14 00 7c 00 7f 00 51 00 20 00 0a 00 03 00 00 00 00 00 00 00 00 # > 01 03 14 00 7c 00 7f 00 53 00 20 00 0a 00 03 00 00 00 00 00 00 00 00 # battery_min_voltage = 12.4 V # battery_max_voltage = 12.7 V # ?1 = 83 % ? # ?2 = # charge_max_power = 10 W # discharge_max_power = 3 W # charge_amp_hour = 0 Ah # discharge_amp_hour = 0 Ah # production_power = 0 Wh # consumption_power = 0 Wh # ff 78 00 00 00 01 CMD_ = b"\xff\x78\x00\x00\x00\x01" # CMD_GET_BATTERY_STATE = b'\xff\x03\x01\x00\x00\x07' # > ff 03 0e 00 48 00 7e 00 1d 0e 0d 00 7e 00 1c 00 03 # CC 11 11 22 22 33 33 44 55 66 66 77 77 88 88 # 1: Battery charge: 72 % # 2: Battery voltage: 12.6 V # 3: Battery current: 0.29 A # 4: Internal temperature? # 5: External temperature probe for battery signed 8bit: 13 degC # 6: Load voltage: 12.6 V # 7: Load current: 0.28 A # 8: Load power: 3 W # CMD_GET_PANEL_STATUS = b'\xff\x03\x01\x07\x00\x04' # > ff 03 08 00 c8 00 14 00 04 00 01 # CC 11 11 22 22 33 33 ?? ?? # > ff 03 08 00 00 00 00 00 00 00 00 # 1: Panel voltage: 20.0 V # 2: Panel current: 0.20 A # 3: Panel power: 4 W # ?: load_enabled # Only factor of 1000 SI_PREFIXES_LARGE = "kMGTPEZY" SI_PREFIXES_SMALL = "mµnpfazy" def humanize_number(data, unit: str = ""): counter = 0 while data >= 1000: data /= 1000 counter += 1 if counter >= len(SI_PREFIXES_LARGE): break while data < 1: data *= 1000 counter -= 1 if abs(counter) >= len(SI_PREFIXES_SMALL): break if not counter: prefix = "" elif counter > 0: prefix = SI_PREFIXES_LARGE[counter - 1] elif counter < 0: prefix = SI_PREFIXES_SMALL[abs(counter) - 1] return f"{data:.3g} {prefix}{unit}" class DataItem: name: str st_format: str unit: Optional[str] transformation: Optional[Callable] def __init__( self, name: str, st_format: str, unit: Optional[str] = None, transform: Optional[Callable] = None, ): self.name = name self.st_format = st_format self.unit = unit self.transformation = transform if self.st_format[0] not in "@=<>!": self.st_format = "!" + self.st_format @property def st_size(self) -> int: return struct.calcsize(self.st_format) def transform(self, data): if self.transformation is None: return data return self.transformation(data) def parse_temperature(bin): if bin & 0x80: return (bin & 0x7F) * -1 return bin & 0x7F DATA_BATTERY_STATE = [ DataItem("battery_charge", "H", "%"), DataItem("battery_voltage", "H", "V", lambda n: n / 10), DataItem("battery_current", "H", "A", lambda n: n / 100), DataItem("internal_temperature", "B", "°C", parse_temperature), DataItem("battery_temperature", "B", "°C", parse_temperature), DataItem("load_voltage", "H", "V", lambda n: n / 10), DataItem("load_current", "H", "A", lambda n: n / 100), DataItem("load_power", "H", "W"), DataItem("panel_voltage", "H", "V", lambda n: n / 10), DataItem("panel_current", "H", "A", lambda n: n / 100), DataItem("panel_power", "H", "W"), DataItem("load_enabled", "x?", transform=bool), ] def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict: pos = offset res = {} for i in items: res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0]) pos += i.st_size return res # GET_BATTERY_STATE def parse_battery_state(data: bytes) -> dict: return parse(data, DATA_BATTERY_STATE) HISTORICAL_DATA = [ DataItem("battery_voltage_min", "H", "V", lambda n: n / 10), DataItem("battery_voltage_max", "H", "V", lambda n: n / 10), DataItem("charge_max_current", "H", "A", lambda n: n / 100), DataItem("_discharge_max_current?", "H", "A", lambda n: n / 100), DataItem("charge_max_power", "H", "W"), DataItem("discharge_max_power", "H", "W"), DataItem("charge_amp_hour", "H", "Ah"), DataItem("discharge_amp_hour", "H", "Ah"), DataItem("production_power", "H", "Wh"), DataItem("consumption_power", "H", "Wh"), DataItem("run_days", "H"), DataItem("discharge_count", "H"), DataItem("full_charge_count", "H"), DataItem("total_charge_amp_hours", "L", "Ah"), DataItem("total_discharge_amp_hours", "L", "Ah"), DataItem("total_production_power", "L", "Wh"), DataItem("total_consumption_power", "L", "Wh"), ] def parse_historical_entry(data: bytes) -> dict: res = parse(data, HISTORICAL_DATA[:10]) res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]]) if len(data) > res_datalen: res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen)) return res def write(fh, data): bdata = bytes(data) crc = modbus(bdata) bcrc = bytes([crc & 0xFF, (crc & 0xFF00) >> 8]) fh.write(data + bcrc) def construct_request(address, words=1, action=ACTION_READ, marker=0xFF): assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}" return struct.pack("!BBHH", marker, action, address, words) def log(*message: object): print(datetime.datetime.utcnow().isoformat(" "), *message) sys.stdout.flush() def parse_packet(data): tag, operation, size = struct.unpack_from("BBB", data) _unpacked = struct.unpack_from(f"<{size}BH", data, offset=3) crc = _unpacked[-1] payload = _unpacked[:-1] calculated_crc = modbus(bytes([tag, operation, size, *payload])) if crc != calculated_crc: e = ValueError(f"CRC missmatch: expected {crc:04X}, got {calculated_crc:04X}.") e.tag = tag e.operation = operation e.size = size e.payload = payload e.crc = crc e.calculated_crc = calculated_crc raise e return payload def readMemory(fh: RawIOBase, address: int, words: int = 1): # log(f"Reading {words} words from 0x{address:04X}") write(fh, construct_request(address, words=words)) header = fh.read(3) if header and len(header) == 3: tag, operation, size = header data = fh.read(size) _crc = fh.read(2) if data and _crc: crc = struct.unpack_from(" bool: if now is None: now = time.time() if (now - self.prev) >= self.interval: skipped, overshoot = divmod(now - self.prev, self.interval) skipped -= 1 if skipped: log("Skipped:", skipped, overshoot, now - self.prev, self.interval) self.prev = now - overshoot return True return False if __name__ == "__main__": conf = get_config() consumers = get_consumers(conf) per_voltages = Periodical(interval=15) per_current_hist = Periodical(interval=60) try: while True: try: log("Connecting...") with BTLEUart(MAC, timeout=10) as dev: log("Connected.") # write(dev, construct_request(0, 32)) # Memory dump # for address in range(0, 0x10000, 16): # log(f"Reading 0x{address:04X}...") # write(wd, construct_request(address, 16)) days = 7 res = readMemory(dev, 0x010B, 21) if res: d = parse_historical_entry(res) log(d) for consumer in consumers: consumer.write(d) days = cast(int, d.get("run_days", 7)) for i in range(days): res = readMemory(dev, 0xF000 + i, 10) if res: d = parse_historical_entry(res) log({i: d}) for consumer in consumers: consumer.write({str(i): d}) while True: now = time.time() if per_voltages(now): # CMD_GET_BATTERY_STATE + CMD_GET_PANEL_STATUS res = readMemory(dev, 0x0100, 11) if res: try: d = parse_battery_state(res) log(d) for consumer in consumers: consumer.write(d) except struct.error as e: log(e) log("0x0100 Unpack error:", len(res), res) log( "Flushed from read buffer; ", dev.read(timeout=0.5), ) if per_current_hist(now): res = readMemory(dev, 0x010B, 21) if res: try: d = parse_historical_entry(res) log(d) for consumer in consumers: consumer.write(d) except struct.error as e: log(e) log("0x010B Unpack error:", len(res), res) log( "Flushed from read buffer; ", dev.read(timeout=0.5), ) # print(".") for consumer in consumers: consumer.poll() time.sleep(max(0, 1 - time.time() - now)) # if STATUS.get('load_enabled'): # write(wd, CMD_DISABLE_LOAD) # else: # write(wd, CMD_ENABLE_LOAD) except btle.BTLEDisconnectError: log("ERROR: Disconnected") time.sleep(1) except (KeyboardInterrupt, SystemExit, Exception) as e: for consumer in consumers: consumer.exit() if type(e) is not KeyboardInterrupt: raise