srne-mqtt/srnemqtt/protocol.py

296 lines
8.5 KiB
Python

# -*- coding: utf-8 -*-
import struct
import sys
import time
from typing import Callable, Collection, Optional
from libscrc import modbus # type: ignore
from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER
from .interfaces import BaseInterface
from .solar_types import (
DATA_BATTERY_STATE,
HISTORICAL_DATA,
ChargerState,
DataItem,
HistoricalData,
HistoricalExtraInfo,
)
from .util import log
def write(fh, data):
bdata = bytes(data)
crc = modbus(bdata)
bcrc = bytes([crc & 0xFF, (crc & 0xFF00) >> 8])
fh.write(data + bcrc)
def construct_read_request(address, words=1, marker=0xFF):
assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}"
return struct.pack("!BBHH", marker, ACTION_READ, address, words)
def construct_write_request(address, marker=0xFF):
assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}"
return struct.pack("!BBH", marker, ACTION_WRITE, address)
def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict:
pos = offset
res = {}
for i in items:
res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0])
pos += i.st_size
return res
# GET_BATTERY_STATE
def parse_battery_state(data: bytes) -> dict:
return parse(data, DATA_BATTERY_STATE)
def parse_historical_entry(data: bytes) -> dict:
res = parse(data, HISTORICAL_DATA[:10])
res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]])
if len(data) > res_datalen:
res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen))
return res
def parse_packet(data):
tag, operation, size = struct.unpack_from("BBB", data)
_unpacked = struct.unpack_from(f"<{size}BH", data, offset=3)
crc = _unpacked[-1]
payload = _unpacked[:-1]
calculated_crc = modbus(bytes([tag, operation, size, *payload]))
if crc != calculated_crc:
e = ValueError(f"CRC missmatch: expected {crc:04X}, got {calculated_crc:04X}.")
# e.tag = tag
# e.operation = operation
# e.size = size
# e.payload = payload
# e.crc = crc
# e.calculated_crc = calculated_crc
raise e
return payload
def discardUntil(fh: BaseInterface, byte: int, timeout=10) -> Optional[int]:
assert byte >= 0 and byte < 256, f"byte: Expected 8bit unsigned int, got {byte}"
def expand(b: Optional[bytes]):
if not b:
return None
return b[0]
start = time.time()
discarded = 0
read_byte = expand(fh.read(1))
while read_byte != byte:
if read_byte is not None:
if not discarded:
log("Discarding", end="")
discarded += 1
print(f" {read_byte:02X}", end="")
sys.stdout.flush()
if time.time() - start > timeout:
read_byte = None
break
read_byte = expand(fh.read(1))
if discarded:
print()
sys.stdout.flush()
return read_byte
def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[bytes]:
# log(f"Reading {words} words from 0x{address:04X}")
request = construct_read_request(address, words=words)
# log("Request:", request)
write(fh, request)
tag = discardUntil(fh, 0xFF)
if tag is None:
return None
header = fh.read(2)
if header and len(header) == 2:
operation, size = header
data = fh.read(size)
_crc = fh.read(2)
if data and _crc:
try:
crc = struct.unpack_from("<H", _crc)[0]
except struct.error:
log(f"readMemory: CRC error; read {len(_crc)} bytes (2 expected)")
return None
calculated_crc = modbus(bytes([tag, operation, size, *data]))
if crc == calculated_crc:
return data
else:
log(f"readMemory: CRC error; {crc:04X} != {calculated_crc:04X}")
log("data or crc is falsely", header, data, _crc)
return None
# set(255, 266, 1 or 0)
# ff 06 01 0a 00 01
# CMD_ENABLE_LOAD = b"\xff\x06\x01\x0a\x00\x01"
# CMD_DISABLE_LOAD = b"\xff\x06\x01\x0a\x00\x00"
# REG_LOAD_ENABLE = 0x010A
def writeMemory(fh: BaseInterface, address: int, data: bytes):
if len(data) % 2:
raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes")
header = construct_write_request(address)
write(fh, header + data)
tag = discardUntil(fh, 0xFF)
if tag is None:
return None
header = fh.read(3)
if header and len(header) == 3:
operation, size, address = header
rdata = fh.read(size * 2)
_crc = fh.read(2)
if rdata and _crc:
try:
crc = struct.unpack_from("<H", _crc)[0]
except struct.error:
log(f"writeMemory: CRC error; read {len(_crc)} bytes (2 expected)")
return None
calculated_crc = modbus(bytes([tag, operation, size, address, *rdata]))
if crc == calculated_crc:
return rdata
else:
log(f"writeMemory: CRC error; {crc:04X} != {calculated_crc:04X}")
log("data or crc is falsely", header, rdata, _crc)
return None
def try_read_parse(
dev: BaseInterface,
address: int,
words: int = 1,
parser: Optional[Callable] = None,
attempts=5,
) -> Optional[dict]:
while attempts:
attempts -= 1
res = readMemory(dev, address, words)
if res:
try:
if parser:
return parser(res)
except struct.error as e:
log(e)
log("0x0100 Unpack error:", len(res), res)
_timeout = dev.timeout
dev.timeout = 0.5
log("Flushed from read buffer; ", dev.read())
dev.timeout = _timeout
else:
log(f"No data read, expected {words*2} bytes (attempts left: {attempts})")
return None
class ChargeController:
device: BaseInterface
def __init__(self, device: BaseInterface):
self.device = device
@property
def serial(self) -> str:
data = readMemory(self.device, 0x18, 3)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
p1 = data[0]
p2 = data[1]
p3 = (data[2] << 8) + data[3]
return f"{p1}-{p2}-{p3}"
@property
def model(self) -> str:
data = readMemory(self.device, 0x0C, 8)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return data.decode("utf-8").strip()
@property
def version(self) -> str:
data = readMemory(self.device, 0x14, 4)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
major = (data[0] << 8) + data[1]
minor = data[2]
patch = data[3]
return f"{major}.{minor}.{patch}"
@property
def load_enabled(self) -> bool:
data = readMemory(self.device, 0x010A, 1)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return struct.unpack("x?", data)[0]
@load_enabled.setter
def load_enabled(self, value: bool):
data = writeMemory(self.device, 0x010A, struct.pack("x?", value))
if data is not None:
res = struct.unpack("x?", data)[0]
if res != value:
log(f"setting load_enabled failed; {res!r} != {value!r}")
else:
log("setting load_enabled failed; communications error")
@property
def state(self) -> ChargerState:
data = readMemory(self.device, 0x0100, 11)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return ChargerState(data)
def get_historical(self, day) -> HistoricalData:
data = readMemory(self.device, 0xF000 + day, 10)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return HistoricalData(data)
@property
def today(self) -> HistoricalData:
data = readMemory(self.device, 0x010B, 10)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return HistoricalData(data)
@property
def extra(self) -> HistoricalExtraInfo:
data = readMemory(self.device, 0x0115, 11)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return HistoricalExtraInfo(data)