#!/usr/bin/env python3 # -*- coding: utf-8 -*- import datetime import io import queue import struct import sys import time from queue import Queue from typing import TYPE_CHECKING, Optional, cast if TYPE_CHECKING: from _typeshed import ReadableBuffer, WriteableBuffer from bluepy import btle from libscrc import modbus MAC = "DC:0D:30:9C:61:BA" INTERVAL = 15 # write_service = "0000ffd0-0000-1000-8000-00805f9b34fb" # read_service = "0000fff0-0000-1000-8000-00805f9b34fb" WRITE_DEVICE = "0000ffd1-0000-1000-8000-00805f9b34fb" READ_DEVICE = "0000fff1-0000-1000-8000-00805f9b34fb" ACTION_READ = 0x03 ACTION_WRITE = 0x03 POSSIBLE_MARKER = (0x01, 0xFD, 0xFE, 0xFF) # get(255, 12, 2) # "ff 03 00 0c 00 02" CMD_GET_1 = b"\xff\x03\x00\x0c\x00\x02" # > ff 03 04 20 20 20 20 # get(255, 12, 8) # ff 03 00 0c 00 08 CMD_GET_MODEL = b"\xff\x03\x00\x0c\x00\x08" # > ff 03 10 20 20 20 20 4d 4c 32 34 32 30 20 20 20 20 20 20 # Device SKU: ML2420 # get(255, 20, 4) # ff 03 00 14 00 04 CMD_GET_VERSION = b"\xff\x03\x00\x14\x00\x04" # > ff 03 08 00 04 02 00 02 00 00 03 # CC ?? 11 22 33 ?? 44 55 66 # Version: 4.2.0 # get(255, 24, 3) # ff 03 00 18 00 03 CMD_GET_SERIAL = b"\xff\x03\x00\x18\x00\x03" # > ff 03 06 3c 13 02 67 00 01 # CC 11 22 33 33 ?? ?? # SN: 60-19-0615 # get(255, 256, 7) # ff 03 01 00 00 07 CMD_GET_BATTERY_STATE = b"\xff\x03\x01\x00\x00\x07" # > ff 03 0e 00 48 00 7e 00 1d 0e 0d 00 7e 00 1c 00 03 # CC 11 11 22 22 33 33 44 55 66 66 77 77 88 88 # 1: Battery charge: 72 % # 2: Battery voltage: 12.6 V # 3: Battery current: 0.29 A # 4: Internal temperature? # 5: External temperature probe for battery signet 8bit: 13 degC # 6: Load voltage: 12.6 V # 7: Load current: 0.28 A # 8: Load power: 3 W # get(255, 263, 4) # ff 03 01 07 00 04 CMD_GET_PANEL_STATUS = b"\xff\x03\x01\x07\x00\x04" # > ff 03 08 00 c8 00 14 00 04 00 01 # CC 11 11 22 22 33 33 ?? ?? # 1: Panel voltage: 20.0 V # 2: Panel current: 0.20 A # 3: Panel power: 4 W # Charging status? # set(255, 266, 1 or 0) # ff 06 01 0a 00 01 CMD_ENABLE_LOAD = b"\xff\x06\x01\x0a\x00\x01" CMD_DISABLE_LOAD = b"\xff\x06\x01\x0a\x00\x00" REG_LOAD_ENABLE = 0x010A # get(255, 267, 21) # ff 03 01 0b 00 15 CMD_GET_LOAD_PARAMETERS = b"\xff\x03\x01\x0b\x00\x15" # > ff 03 2a 00 7c 00 7f 00 51 00 20 00 0a 00 03 00 00 00 00 00 # > 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 # > 00 00 00 00 00 # get(255, 288, 3) # ff 03 01 20 00 03 CMD_GET_2 = b"\xff\x03\x01\x20\x00\x03" # > ff 03 06 80 02 00 00 00 00 # CC 11 22 33 33 33 33 # 1: boolean flag?: 1 # 2: ?: 2 # 3: ?: 0 # get(255, 57345, 33) # ff 03 e0 01 00 21 CMD_GET_BATTERY_PARAMETERS = b"\xff\x03\xe0\x01\x00\x21" # > ff 03 42 07 d0 00 c8 ff 0c 00 02 00 a0 00 9b 00 92 00 90 00 # > 8a 00 84 00 7e 00 78 00 6f 00 6a 64 32 00 05 00 78 00 78 00 # > 1e 00 03 00 41 00 a3 00 4b 00 a3 00 00 00 00 00 00 00 00 00 # > 0f 00 05 00 05 00 04 01 00 # 33 * uint16 # get(1, 61440, 10) # 01 03 f0 00 00 0a CMD_GET_HISTORICAL_TODAY = b"\x01\x03\xf0\x00\x00\x0a" CMD_GET_HISTORICAL_YESTERDAY = b"\x01\x03\xf0\x01\x00\x0a" CMD_GET_HISTORICAL_D2 = b"\x01\x03\xf0\x02\x00\x0a" CMD_GET_HISTORICAL_D3 = b"\x01\x03\xf0\x03\x00\x0a" # ,- battery_min_voltage # | ,- battery_max_voltage # | | ,- ?1 max charge %? # | | | ,- ?2 # | | | | ,- charge_max_power # | | | | | ,- discharge_max_power # | | | | | | ,- charge_amp_hour # | | | | | | | ,- discharge_amp_hour # | | | | | | | | ,- production_power # | | | | | | | | | ,- consumption_power # _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ # > 01 03 14 00 7c 00 7f 00 51 00 20 00 0a 00 03 00 00 00 00 00 00 00 00 # > 01 03 14 00 7c 00 7f 00 53 00 20 00 0a 00 03 00 00 00 00 00 00 00 00 # battery_min_voltage = 12.4 V # battery_max_voltage = 12.7 V # ?1 = 83 % ? # ?2 = # charge_max_power = 10 W # discharge_max_power = 3 W # charge_amp_hour = 0 Ah # discharge_amp_hour = 0 Ah # production_power = 0 Wh # consumption_power = 0 Wh # ff 78 00 00 00 01 CMD_ = b"\xff\x78\x00\x00\x00\x01" # CMD_GET_BATTERY_STATE = b'\xff\x03\x01\x00\x00\x07' # > ff 03 0e 00 48 00 7e 00 1d 0e 0d 00 7e 00 1c 00 03 # CC 11 11 22 22 33 33 44 55 66 66 77 77 88 88 # 1: Battery charge: 72 % # 2: Battery voltage: 12.6 V # 3: Battery current: 0.29 A # 4: Internal temperature? # 5: External temperature probe for battery signed 8bit: 13 degC # 6: Load voltage: 12.6 V # 7: Load current: 0.28 A # 8: Load power: 3 W # CMD_GET_PANEL_STATUS = b'\xff\x03\x01\x07\x00\x04' # > ff 03 08 00 c8 00 14 00 04 00 01 # CC 11 11 22 22 33 33 ?? ?? # > ff 03 08 00 00 00 00 00 00 00 00 # 1: Panel voltage: 20.0 V # 2: Panel current: 0.20 A # 3: Panel power: 4 W # ?: load_enabled STATUS = {} def parsePacket(data): timestamp = datetime.datetime.utcnow().isoformat(" ") # prefix = data[0] operation = data[1] cc = data[2] res = None if operation == 3: if cc == 0x0E: # GET_BATTERY_STATE res = dict( zip( ( "battery_charge", "battery_voltage", "battery_current", "_internal_temperature?", "battery_temperature", "load_voltage", "load_current", "load_power", ), struct.unpack("!xxxHHHbbHHHxx", data), ) ) res["battery_voltage"] /= 10 res["battery_current"] /= 100 res["load_voltage"] /= 10 res["load_current"] /= 100 STATUS.update(res) elif cc == 0x08: # GET_PANEL_STATUS (OR version) res = dict( zip( ("panel_voltage", "panel_current", "panel_power", "load_enabled"), struct.unpack("!xxxHHHx?xx", data), ) ) res["panel_voltage"] /= 10 res["panel_current"] /= 100 STATUS.update(res) elif operation == 6 and cc == 1: res = dict(zip(("load_enabled",), struct.unpack("!xxxxx?xx", data))) STATUS.update(res) if res: print(timestamp, res) return res print(timestamp, data) sys.stdout.flush() class Delegate(btle.DefaultDelegate): data = bytearray() def handleNotification(self, cHandle, data): # print(cHandle, data, dlen) self.data.extend(data) c_crc = modbus(bytes(self.data[:-2])) # byte order is inverted in regards to libscrc output d_crc = self.data[-1] << 8 | self.data[-2] # print(hex(c_crc), hex(d_crc)) if c_crc == d_crc: parsePacket(self.data) self.data.clear() class BTLEUart(io.RawIOBase): mac: str write_endpoint: str read_endpoint: str timeout: float device: Optional[btle.Peripheral] = None _write_handle: Optional[btle.Characteristic] = None _read_handle: Optional[btle.Characteristic] = None delegate: "_QueueDelegate" _read_buffer: bytearray class _QueueDelegate(btle.DefaultDelegate): queue: Queue handle: Optional[int] def __init__(self, queue, handle=None): self.queue = queue self.handle = handle def handleNotification(self, cHandle: int, data: bytes): # print("Notification:", cHandle, "sent data", binascii.b2a_hex(data)) if self.handle is not None and cHandle != self.handle: return self.queue.put(data) def __init__( self, mac: str, write_endpoint: str = WRITE_DEVICE, read_endpoint: str = READ_DEVICE, timeout: float = 30, ): self.mac = mac self.write_endpoint = write_endpoint self.read_endpoint = read_endpoint self.timeout = timeout self.delegate = self._QueueDelegate(Queue()) self._read_buffer = bytearray() self._connect() def _ensure_connected(self): if self.device is None: self._connect() def _poll(self, timeout: float = 0.0001): self._ensure_connected() if TYPE_CHECKING: self.device = cast(btle.Peripheral, self.device) start = time.time() left = timeout - (time.time() - start) while self.device.waitForNotifications(max(left, 0) or 0.0001): left = timeout - (time.time() - start) if left < 0: break def _connect(self): try: del self.device except Exception: pass self.device = btle.Peripheral(self.mac).withDelegate(self.delegate) self._read_handle = self.device.getCharacteristics(uuid=self.read_endpoint)[0] # self.delegate.handle = self._read_handle.handle self._write_handle = self.device.getCharacteristics(uuid=self.write_endpoint)[0] # print("Handles:", self._read_handle.handle, self._write_handle.handle) def _read(self, num: Optional[int] = None, timeout: Optional[float] = None): self._ensure_connected() if TYPE_CHECKING: self.device = cast(btle.Peripheral, self.device) if timeout is None: timeout = self.timeout if num is None: start = time.time() while not len(self._read_buffer): left = timeout - (time.time() - start) if left < 0: break self._poll() try: self._read_buffer.extend(self.delegate.queue.get_nowait()) except queue.Empty: pass try: while True: self._poll() self._read_buffer.extend(self.delegate.queue.get_nowait()) except queue.Empty: pass else: start = time.time() while len(self._read_buffer) < num: left = timeout - (time.time() - start) if left < 0: break self._poll() try: self._read_buffer.extend(self.delegate.queue.get_nowait()) except queue.Empty: pass if num is None: data = bytes(self._read_buffer.copy()) self._read_buffer.clear() else: data = bytes(self._read_buffer[:num]) del self._read_buffer[:num] return data or None def readinto(self, buffer: "WriteableBuffer") -> Optional[int]: data = self._read(len(buffer)) if data is None: return None buffer[: len(data)] = data return len(data) def readall(self) -> bytes: return self._read() def read( self, size: Optional[int] = None, timeout: Optional[float] = None ) -> Optional[bytes]: if timeout: _timeout = self.timeout self.timeout = timeout if size is None: res = super().read() else: res = super().read(size) if timeout: self.timeout = _timeout return res def write(self, b: "ReadableBuffer") -> Optional[int]: self._ensure_connected() if TYPE_CHECKING: self.device = cast(btle.Peripheral, self.device) if self._write_handle is None: raise IOError("write_handle not open") self._write_handle.write(b, withResponse=True) return len(b) def __enter__(self): return self def __exit__(self, type, value, traceback): self.device.disconnect() del self.device def seekable(self) -> bool: return False def readable(self) -> bool: return True def writable(self) -> bool: return True # dlgt = Delegate() # prev = time.time() - INTERVAL # while True: # try: # log("Connecting...") # with btle.Peripheral(MAC).withDelegate(dlgt) as dev: # wd = dev.getCharacteristics(uuid=write_device)[0] def write(fh, data): bdata = bytes(data) crc = modbus(bdata) bcrc = bytes([crc & 0xFF, (crc & 0xFF00) >> 8]) fh.write(data + bcrc) def construct_request(address, words=1, action=ACTION_READ, marker=0xFF): assert marker in {POSSIBLE_MARKER}, f"marker should be one of {POSSIBLE_MARKER}" return struct.pack("!BBHH", marker, action, address, words) def poll(dev: btle.Peripheral, timeout: float = 1) -> bool: start = time.time() while not dev.waitForNotifications(0.2): if time.time() < start + timeout: return False while dev.waitForNotifications(0.2): pass return True def log(string: str): print(datetime.datetime.utcnow().isoformat(" "), string) sys.stdout.flush() def parse_packet(data): tag, operation, size = struct.unpack_from("BBB", data) _unpacked = struct.unpack_from(f"<{size}BH", data, offset=3) crc = _unpacked[-1] payload = _unpacked[:-1] calculated_crc = modbus(bytes([tag, operation, size, *payload])) if crc != calculated_crc: e = ValueError(f"CRC missmatch: expected {crc:04X}, got {calculated_crc:04X}.") e.tag = tag e.operation = operation e.size = size e.payload = payload e.crc = crc e.calculated_crc = calculated_crc raise e return payload if __name__ == "__main__": dlgt = Delegate() prev = time.time() - INTERVAL while True: try: log("Connecting...") with btle.Peripheral(MAC).withDelegate(dlgt) as dev: wd = dev.getCharacteristics(uuid=WRITE_DEVICE)[0] log("Connected.") poll(dev) write(wd, construct_request(0, 32)) poll(dev) poll(dev) poll(dev) # Memory dump # for address in range(0, 0x10000, 16): # log(f"Reading 0x{address:04X}...") # write(wd, construct_request(address, 16)) # poll(dev) # poll(dev) # poll(dev) # poll(dev) # poll(dev) while True: poll(dev) now = time.time() diff = now - prev if diff >= INTERVAL: prev += INTERVAL write(wd, construct_request(0x0107, 4)) # CMD_GET_PANEL_STATUS poll(dev) write(wd, construct_request(0x0100, 7)) # CMD_GET_BATTERY_STATE poll(dev) # if STATUS.get('load_enabled'): # write(wd, CMD_DISABLE_LOAD) # else: # write(wd, CMD_ENABLE_LOAD) except btle.BTLEDisconnectError: log("ERROR: Disconnected") time.sleep(1) try: dev.close() except Exception: pass