#!/usr/bin/env python3 # -*- coding: utf-8 -*- import datetime import struct import sys import time from decimal import Decimal from io import RawIOBase from typing import Callable, Collection, Optional, cast from bluepy import btle from libscrc import modbus from .config import get_config, get_consumers from .feasycom_ble import BTLEUart from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, DataItem, DataName MAC = "DC:0D:30:9C:61:BA" # write_service = "0000ffd0-0000-1000-8000-00805f9b34fb" # read_service = "0000fff0-0000-1000-8000-00805f9b34fb" ACTION_READ = 0x03 ACTION_WRITE = 0x03 POSSIBLE_MARKER = (0x01, 0xFD, 0xFE, 0xFF) # get(255, 12, 2) # "ff 03 00 0c 00 02" CMD_GET_1 = b"\xff\x03\x00\x0c\x00\x02" # > ff 03 04 20 20 20 20 # get(255, 12, 8) # ff 03 00 0c 00 08 CMD_GET_MODEL = b"\xff\x03\x00\x0c\x00\x08" # > ff 03 10 20 20 20 20 4d 4c 32 34 32 30 20 20 20 20 20 20 # Device SKU: ML2420 # get(255, 20, 4) # ff 03 00 14 00 04 CMD_GET_VERSION = b"\xff\x03\x00\x14\x00\x04" # > ff 03 08 00 04 02 00 02 00 00 03 # CC ?? 11 22 33 ?? 44 55 66 # Version: 4.2.0 # get(255, 24, 3) # ff 03 00 18 00 03 CMD_GET_SERIAL = b"\xff\x03\x00\x18\x00\x03" # > ff 03 06 3c 13 02 67 00 01 # CC 11 22 33 33 ?? ?? # SN: 60-19-0615 # get(255, 256, 7) # ff 03 01 00 00 07 CMD_GET_BATTERY_STATE = b"\xff\x03\x01\x00\x00\x07" # > ff 03 0e 00 48 00 7e 00 1d 0e 0d 00 7e 00 1c 00 03 # CC 11 11 22 22 33 33 44 55 66 66 77 77 88 88 # 1: Battery charge: 72 % # 2: Battery voltage: 12.6 V # 3: Battery current: 0.29 A # 4: Internal temperature? # 5: External temperature probe for battery signet 8bit: 13 degC # 6: Load voltage: 12.6 V # 7: Load current: 0.28 A # 8: Load power: 3 W # get(255, 263, 4) # ff 03 01 07 00 04 CMD_GET_PANEL_STATUS = b"\xff\x03\x01\x07\x00\x04" # > ff 03 08 00 c8 00 14 00 04 00 01 # CC 11 11 22 22 33 33 ?? ?? # 1: Panel voltage: 20.0 V # 2: Panel current: 0.20 A # 3: Panel power: 4 W # Charging status? # set(255, 266, 1 or 0) # ff 06 01 0a 00 01 CMD_ENABLE_LOAD = b"\xff\x06\x01\x0a\x00\x01" CMD_DISABLE_LOAD = b"\xff\x06\x01\x0a\x00\x00" REG_LOAD_ENABLE = 0x010A # get(255, 267, 21) # ff 03 01 0b 00 15 CMD_GET_LOAD_PARAMETERS = b"\xff\x03\x01\x0b\x00\x15" # > ff 03 2a 00 7c 00 7f 00 51 00 20 00 0a 00 03 00 00 00 00 00 # > 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 # > 00 00 00 00 00 # get(255, 288, 3) # ff 03 01 20 00 03 CMD_GET_2 = b"\xff\x03\x01\x20\x00\x03" # > ff 03 06 80 02 00 00 00 00 # CC 11 22 33 33 33 33 # 1: boolean flag?: 1 # 2: ?: 2 # 3: ?: 0 # get(255, 57345, 33) # ff 03 e0 01 00 21 CMD_GET_BATTERY_PARAMETERS = b"\xff\x03\xe0\x01\x00\x21" # > ff 03 42 07 d0 00 c8 ff 0c 00 02 00 a0 00 9b 00 92 00 90 00 # > 8a 00 84 00 7e 00 78 00 6f 00 6a 64 32 00 05 00 78 00 78 00 # > 1e 00 03 00 41 00 a3 00 4b 00 a3 00 00 00 00 00 00 00 00 00 # > 0f 00 05 00 05 00 04 01 00 # 33 * uint16 # get(1, 61440, 10) # 01 03 f0 00 00 0a CMD_GET_HISTORICAL_TODAY = b"\x01\x03\xf0\x00\x00\x0a" CMD_GET_HISTORICAL_YESTERDAY = b"\x01\x03\xf0\x01\x00\x0a" CMD_GET_HISTORICAL_D2 = b"\x01\x03\xf0\x02\x00\x0a" CMD_GET_HISTORICAL_D3 = b"\x01\x03\xf0\x03\x00\x0a" # ,- battery_min_voltage # | ,- battery_max_voltage # | | ,- ?1 max charge %? # | | | ,- ?2 # | | | | ,- charge_max_power # | | | | | ,- discharge_max_power # | | | | | | ,- charge_amp_hour # | | | | | | | ,- discharge_amp_hour # | | | | | | | | ,- production_power # | | | | | | | | | ,- consumption_power # _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ # > 01 03 14 00 7c 00 7f 00 51 00 20 00 0a 00 03 00 00 00 00 00 00 00 00 # > 01 03 14 00 7c 00 7f 00 53 00 20 00 0a 00 03 00 00 00 00 00 00 00 00 # battery_min_voltage = 12.4 V # battery_max_voltage = 12.7 V # ?1 = 83 % ? # ?2 = # charge_max_power = 10 W # discharge_max_power = 3 W # charge_amp_hour = 0 Ah # discharge_amp_hour = 0 Ah # production_power = 0 Wh # consumption_power = 0 Wh # ff 78 00 00 00 01 CMD_ = b"\xff\x78\x00\x00\x00\x01" # CMD_GET_BATTERY_STATE = b'\xff\x03\x01\x00\x00\x07' # > ff 03 0e 00 48 00 7e 00 1d 0e 0d 00 7e 00 1c 00 03 # CC 11 11 22 22 33 33 44 55 66 66 77 77 88 88 # 1: Battery charge: 72 % # 2: Battery voltage: 12.6 V # 3: Battery current: 0.29 A # 4: Internal temperature? # 5: External temperature probe for battery signed 8bit: 13 degC # 6: Load voltage: 12.6 V # 7: Load current: 0.28 A # 8: Load power: 3 W # CMD_GET_PANEL_STATUS = b'\xff\x03\x01\x07\x00\x04' # > ff 03 08 00 c8 00 14 00 04 00 01 # CC 11 11 22 22 33 33 ?? ?? # > ff 03 08 00 00 00 00 00 00 00 00 # 1: Panel voltage: 20.0 V # 2: Panel current: 0.20 A # 3: Panel power: 4 W # ?: load_enabled # Only factor of 1000 SI_PREFIXES_LARGE = "kMGTPEZY" SI_PREFIXES_SMALL = "mµnpfazy" def humanize_number(data, unit: str = ""): counter = 0 while data >= 1000: data /= 1000 counter += 1 if counter >= len(SI_PREFIXES_LARGE): break while data < 1: data *= 1000 counter -= 1 if abs(counter) >= len(SI_PREFIXES_SMALL): break if not counter: prefix = "" elif counter > 0: prefix = SI_PREFIXES_LARGE[counter - 1] elif counter < 0: prefix = SI_PREFIXES_SMALL[abs(counter) - 1] return f"{data:.3g} {prefix}{unit}" def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict: pos = offset res = {} for i in items: res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0]) pos += i.st_size return res # GET_BATTERY_STATE def parse_battery_state(data: bytes) -> dict: return parse(data, DATA_BATTERY_STATE) def parse_historical_entry(data: bytes) -> dict: res = parse(data, HISTORICAL_DATA[:10]) res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]]) if len(data) > res_datalen: res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen)) return res def write(fh, data): bdata = bytes(data) crc = modbus(bdata) bcrc = bytes([crc & 0xFF, (crc & 0xFF00) >> 8]) fh.write(data + bcrc) def construct_request(address, words=1, action=ACTION_READ, marker=0xFF): assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}" return struct.pack("!BBHH", marker, action, address, words) def log(*message: object, **kwargs): print(datetime.datetime.utcnow().isoformat(" "), *message, **kwargs) sys.stdout.flush() def parse_packet(data): tag, operation, size = struct.unpack_from("BBB", data) _unpacked = struct.unpack_from(f"<{size}BH", data, offset=3) crc = _unpacked[-1] payload = _unpacked[:-1] calculated_crc = modbus(bytes([tag, operation, size, *payload])) if crc != calculated_crc: e = ValueError(f"CRC missmatch: expected {crc:04X}, got {calculated_crc:04X}.") e.tag = tag e.operation = operation e.size = size e.payload = payload e.crc = crc e.calculated_crc = calculated_crc raise e return payload def discardUntil(fh: RawIOBase, byte: int, timeout=10) -> Optional[int]: assert byte >= 0 and byte < 256, f"byte: Expected 8bit unsigned int, got {byte}" def expand(b: Optional[bytes]): if b is None: return b return b[0] start = time.time() discarded = 0 read_byte = expand(fh.read(1)) while read_byte != byte: if read_byte is not None: if not discarded: log("Discarding", end="") discarded += 1 print(f" {read_byte:02X}", end="") sys.stdout.flush() if time.time() - start > timeout: read_byte = None break read_byte = expand(fh.read(1)) if discarded: print() sys.stdout.flush() return read_byte def readMemory(fh: RawIOBase, address: int, words: int = 1) -> Optional[bytes]: # log(f"Reading {words} words from 0x{address:04X}") request = construct_request(address, words=words) # log("Request:", request) write(fh, request) tag = discardUntil(fh, 0xFF) if tag is None: return None header = fh.read(2) if header and len(header) == 2: operation, size = header data = fh.read(size) _crc = fh.read(2) if data and _crc: try: crc = struct.unpack_from(" bool: if now is None: now = time.time() if (now - self.prev) >= self.interval: skipped, overshoot = divmod(now - self.prev, self.interval) skipped -= 1 if skipped: log("Skipped:", skipped, overshoot, now - self.prev, self.interval) self.prev = now - overshoot return True return False def try_read_parse( dev: BTLEUart, address: int, words: int = 1, parser: Callable = None, attempts=5, ) -> Optional[dict]: while attempts: attempts -= 1 res = readMemory(dev, address, words) if res: try: if parser: return parser(res) except struct.error as e: log(e) log("0x0100 Unpack error:", len(res), res) log("Flushed from read buffer; ", dev.read(timeout=0.5)) else: log(f"No data read, expected {words*2} bytes (attempts left: {attempts})") return None if __name__ == "__main__": conf = get_config() consumers = get_consumers(conf) per_voltages = Periodical(interval=15) per_current_hist = Periodical(interval=60) try: while True: try: log("Connecting...") with BTLEUart(MAC, timeout=5) as dev: log("Connected.") # write(dev, construct_request(0, 32)) # Memory dump # for address in range(0, 0x10000, 16): # log(f"Reading 0x{address:04X}...") # write(wd, construct_request(address, 16)) days = 7 res = try_read_parse(dev, 0x010B, 21, parse_historical_entry) if res: log(res) for consumer in consumers: consumer.write(res) days = cast(int, res.get("run_days", 7)) for i in range(days): res = try_read_parse( dev, 0xF000 + i, 10, parse_historical_entry ) if res: log({i: res}) for consumer in consumers: consumer.write({str(i): res}) while True: now = time.time() if per_voltages(now): data = try_read_parse(dev, 0x0100, 11, parse_battery_state) if data: data[DataName.CALCULATED_BATTERY_POWER] = float( Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0))) * Decimal( str(data.get(DataName.BATTERY_CURRENT, 0)) ) ) data[DataName.CALCULATED_PANEL_POWER] = float( Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0))) * Decimal(str(data.get(DataName.PANEL_CURRENT, 0))) ) data[DataName.CALCULATED_LOAD_POWER] = float( Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0))) * Decimal(str(data.get(DataName.LOAD_CURRENT, 0))) ) log(data) for consumer in consumers: consumer.write(data) if per_current_hist(now): data = try_read_parse( dev, 0x010B, 21, parse_historical_entry ) if data: log(data) for consumer in consumers: consumer.write(data) # print(".") for consumer in consumers: consumer.poll() time.sleep(max(0, 1 - time.time() - now)) # if STATUS.get('load_enabled'): # write(wd, CMD_DISABLE_LOAD) # else: # write(wd, CMD_ENABLE_LOAD) except btle.BTLEDisconnectError: log("ERROR: Disconnected") time.sleep(1) except (KeyboardInterrupt, SystemExit, Exception) as e: for consumer in consumers: consumer.exit() if type(e) is not KeyboardInterrupt: raise