# -*- coding: utf-8 -*- import struct import sys import time from typing import Callable, Collection, Optional from libscrc import modbus # type: ignore from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER from .interfaces import BaseInterface from .solar_types import ( DATA_BATTERY_STATE, HISTORICAL_DATA, ChargerState, DataItem, HistoricalData, HistoricalExtraInfo, ) from .util import log def write(fh, data): bdata = bytes(data) crc = modbus(bdata) bcrc = bytes([crc & 0xFF, (crc & 0xFF00) >> 8]) fh.write(data + bcrc) def construct_read_request(address, words=1, marker=0xFF): assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}" return struct.pack("!BBHH", marker, ACTION_READ, address, words) def construct_write_request(address, marker=0xFF): assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}" return struct.pack("!BBH", marker, ACTION_WRITE, address) def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict: pos = offset res = {} for i in items: res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0]) pos += i.st_size return res # GET_BATTERY_STATE def parse_battery_state(data: bytes) -> dict: return parse(data, DATA_BATTERY_STATE) def parse_historical_entry(data: bytes) -> dict: res = parse(data, HISTORICAL_DATA[:10]) res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]]) if len(data) > res_datalen: res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen)) return res def parse_packet(data): tag, operation, size = struct.unpack_from("BBB", data) _unpacked = struct.unpack_from(f"<{size}BH", data, offset=3) crc = _unpacked[-1] payload = _unpacked[:-1] calculated_crc = modbus(bytes([tag, operation, size, *payload])) if crc != calculated_crc: e = ValueError(f"CRC missmatch: expected {crc:04X}, got {calculated_crc:04X}.") # e.tag = tag # e.operation = operation # e.size = size # e.payload = payload # e.crc = crc # e.calculated_crc = calculated_crc raise e return payload def discardUntil(fh: BaseInterface, byte: int, timeout=10) -> Optional[int]: assert byte >= 0 and byte < 256, f"byte: Expected 8bit unsigned int, got {byte}" def expand(b: Optional[bytes]): if not b: return None return b[0] start = time.time() discarded = 0 read_byte = expand(fh.read(1)) while read_byte != byte: if read_byte is not None: if not discarded: log("Discarding", end="") discarded += 1 print(f" {read_byte:02X}", end="") sys.stdout.flush() if time.time() - start > timeout: read_byte = None break read_byte = expand(fh.read(1)) if discarded: print() sys.stdout.flush() return read_byte def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[bytes]: # log(f"Reading {words} words from 0x{address:04X}") request = construct_read_request(address, words=words) # log("Request:", request) write(fh, request) tag = discardUntil(fh, 0xFF) if tag is None: return None header = fh.read(2) if header and len(header) == 2: operation, size = header data = fh.read(size) _crc = fh.read(2) if data and _crc: try: crc = struct.unpack_from(" Optional[dict]: while attempts: attempts -= 1 res = readMemory(dev, address, words) if res: try: if parser: return parser(res) except struct.error as e: log(e) log("0x0100 Unpack error:", len(res), res) _timeout = dev.timeout dev.timeout = 0.5 log("Flushed from read buffer; ", dev.read()) dev.timeout = _timeout else: log(f"No data read, expected {words*2} bytes (attempts left: {attempts})") return None class ChargeController: device: BaseInterface def __init__(self, device: BaseInterface): self.device = device _cached_serial: str | None = None @property def serial(self) -> str: if self._cached_serial is not None: return self._cached_serial data = readMemory(self.device, 0x18, 3) if data is None: raise IOError # FIXME: Raise specific error in readMemory p1 = data[0] p2 = data[1] p3 = (data[2] << 8) + data[3] self._cached_serial = f"{p1}-{p2}-{p3}" return self._cached_serial _cached_model: str | None = None @property def model(self) -> str: if self._cached_model is not None: return self._cached_model data = readMemory(self.device, 0x0C, 8) if data is None: raise IOError # FIXME: Raise specific error in readMemory self._cached_model = data.decode("utf-8").strip() return self._cached_model _cached_version: str | None = None @property def version(self) -> str: if self._cached_version is not None: return self._cached_version data = readMemory(self.device, 0x14, 4) if data is None: raise IOError # FIXME: Raise specific error in readMemory major = (data[0] << 8) + data[1] minor = data[2] patch = data[3] self._cached_version = f"{major}.{minor}.{patch}" return self._cached_version @property def load_enabled(self) -> bool: data = readMemory(self.device, 0x010A, 1) if data is None: raise IOError # FIXME: Raise specific error in readMemory return struct.unpack("x?", data)[0] @load_enabled.setter def load_enabled(self, value: bool): data = writeMemory(self.device, 0x010A, struct.pack("x?", value)) if data is not None: res = struct.unpack("x?", data)[0] if res != value: log(f"setting load_enabled failed; {res!r} != {value!r}") else: log("setting load_enabled failed; communications error") @property def state(self) -> ChargerState: data = readMemory(self.device, 0x0100, 11) if data is None: raise IOError # FIXME: Raise specific error in readMemory return ChargerState(data) def get_historical(self, day) -> HistoricalData: data = readMemory(self.device, 0xF000 + day, 10) if data is None: raise IOError # FIXME: Raise specific error in readMemory return HistoricalData(data) @property def today(self) -> HistoricalData: data = readMemory(self.device, 0x010B, 10) if data is None: raise IOError # FIXME: Raise specific error in readMemory return HistoricalData(data) @property def extra(self) -> HistoricalExtraInfo: data = readMemory(self.device, 0x0115, 11) if data is None: raise IOError # FIXME: Raise specific error in readMemory return HistoricalExtraInfo(data)