# -*- coding: utf-8 -*- import struct import sys import time from io import RawIOBase from typing import Callable, Collection, Optional from libscrc import modbus from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER from .interfaces import BaseInterface from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, DataItem from .util import log def write(fh, data): bdata = bytes(data) crc = modbus(bdata) bcrc = bytes([crc & 0xFF, (crc & 0xFF00) >> 8]) fh.write(data + bcrc) def construct_request(address, words=1, action=ACTION_READ, marker=0xFF): assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}" return struct.pack("!BBHH", marker, action, address, words) def construct_write(address, data: bytes, action=ACTION_WRITE, marker=0xFF): assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}" return struct.pack("!BBH", marker, action, address) + data def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict: pos = offset res = {} for i in items: res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0]) pos += i.st_size return res # GET_BATTERY_STATE def parse_battery_state(data: bytes) -> dict: return parse(data, DATA_BATTERY_STATE) def parse_historical_entry(data: bytes) -> dict: res = parse(data, HISTORICAL_DATA[:10]) res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]]) if len(data) > res_datalen: res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen)) return res def parse_packet(data): tag, operation, size = struct.unpack_from("BBB", data) _unpacked = struct.unpack_from(f"<{size}BH", data, offset=3) crc = _unpacked[-1] payload = _unpacked[:-1] calculated_crc = modbus(bytes([tag, operation, size, *payload])) if crc != calculated_crc: e = ValueError(f"CRC missmatch: expected {crc:04X}, got {calculated_crc:04X}.") e.tag = tag e.operation = operation e.size = size e.payload = payload e.crc = crc e.calculated_crc = calculated_crc raise e return payload def discardUntil(fh: RawIOBase, byte: int, timeout=10) -> Optional[int]: assert byte >= 0 and byte < 256, f"byte: Expected 8bit unsigned int, got {byte}" def expand(b: Optional[bytes]): if b is None: return b return b[0] start = time.time() discarded = 0 read_byte = expand(fh.read(1)) while read_byte != byte: if read_byte is not None: if not discarded: log("Discarding", end="") discarded += 1 print(f" {read_byte:02X}", end="") sys.stdout.flush() if time.time() - start > timeout: read_byte = None break read_byte = expand(fh.read(1)) if discarded: print() sys.stdout.flush() return read_byte def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[bytes]: # log(f"Reading {words} words from 0x{address:04X}") request = construct_request(address, words=words) # log("Request:", request) write(fh, request) tag = discardUntil(fh, 0xFF) if tag is None: return None header = fh.read(2) if header and len(header) == 2: operation, size = header data = fh.read(size) _crc = fh.read(2) if data and _crc: try: crc = struct.unpack_from(" Optional[bytes]: # TODO: Verify behavior on multi-word writes # log(f"Reading {words} words from 0x{address:04X}") request = construct_write(address, data=output_data) # log("Request:", request) write(fh, request) tag = discardUntil(fh, 0xFF) if tag is None: return None _operation = fh.read(1) result_addr = fh.read(2) # log("Operation:", _operation) if _operation is not None and result_addr is not None: operation = _operation[0] data = fh.read(2) # log("Data:", data) _crc = fh.read(2) if data and _crc: try: crc = struct.unpack_from(" Optional[dict]: while attempts: attempts -= 1 res = readMemory(dev, address, words) if res: try: if parser: return parser(res) except struct.error as e: log(e) log("0x0100 Unpack error:", len(res), res) log("Flushed from read buffer; ", dev.read()) # TODO: timeout=0.5 else: log(f"No data read, expected {words*2} bytes (attempts left: {attempts})") return None