srne-mqtt/srnemqtt/protocol.py

375 lines
11 KiB
Python

# -*- coding: utf-8 -*-
import struct
import time
from logging import getLogger
from typing import Callable, Collection, List, Optional
from libscrc import modbus # type: ignore
from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER
from .interfaces import BaseInterface
from .solar_types import (
DATA_BATTERY_STATE,
HISTORICAL_DATA,
ChargerState,
DataItem,
HistoricalData,
HistoricalExtraInfo,
)
logger = getLogger(__name__)
def write(fh, data):
bdata = bytes(data)
crc = modbus(bdata)
bcrc = bytes([crc & 0xFF, (crc & 0xFF00) >> 8])
fh.write(data + bcrc)
def construct_read_request(address, words=1, marker=0xFF):
assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}"
return struct.pack("!BBHH", marker, ACTION_READ, address, words)
def construct_write_request(address, marker=0xFF):
assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}"
return struct.pack("!BBH", marker, ACTION_WRITE, address)
def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict:
pos = offset
res = {}
for i in items:
res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0])
pos += i.st_size
return res
# GET_BATTERY_STATE
def parse_battery_state(data: bytes) -> dict:
return parse(data, DATA_BATTERY_STATE)
def parse_historical_entry(data: bytes) -> dict:
res = parse(data, HISTORICAL_DATA[:10])
res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]])
if len(data) > res_datalen:
res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen))
return res
def parse_packet(data):
tag, operation, size = struct.unpack_from("BBB", data)
_unpacked = struct.unpack_from(f"<{size}BH", data, offset=3)
crc = _unpacked[-1]
payload = _unpacked[:-1]
calculated_crc = modbus(bytes([tag, operation, size, *payload]))
if crc != calculated_crc:
e = ValueError(f"CRC missmatch: expected {crc:04X}, got {calculated_crc:04X}.")
# e.tag = tag
# e.operation = operation
# e.size = size
# e.payload = payload
# e.crc = crc
# e.calculated_crc = calculated_crc
raise e
return payload
def discardUntil(fh: BaseInterface, byte: int, timeout=10) -> Optional[int]:
assert byte >= 0 and byte < 256, f"byte: Expected 8bit unsigned int, got {byte}"
def expand(b: Optional[bytes]):
if not b:
return None
return b[0]
start = time.time()
discarded: List[str] = []
read_byte = expand(fh.read(1))
while read_byte != byte:
if read_byte is not None:
if not discarded:
discarded.append("Discarding")
discarded.append(f"{read_byte:02X}")
if time.time() - start > timeout:
read_byte = None
break
read_byte = expand(fh.read(1))
if discarded:
logger.debug(" ".join(discarded))
return read_byte
def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[bytes]:
# log(f"Reading {words} words from 0x{address:04X}")
request = construct_read_request(address, words=words)
# log("Request:", request)
write(fh, request)
tag = discardUntil(fh, 0xFF)
if tag is None:
return None
header = fh.read(2)
if header and len(header) == 2:
operation, size = header
data = fh.read(size)
_crc = fh.read(2)
if data and _crc:
try:
crc = struct.unpack_from("<H", _crc)[0]
except struct.error:
logger.error(
"readMemory: CRC error; read %s bytes (2 expected)", len(_crc)
)
return None
calculated_crc = modbus(bytes([tag, operation, size, *data]))
if crc == calculated_crc:
return data
else:
logger.error(
f"readMemory: CRC error; {crc:04X} != {calculated_crc:04X}"
)
logger.error("data or crc is falsely %s %s %s", header, data, _crc)
return None
# set(255, 266, 1 or 0)
# ff 06 01 0a 00 01
# CMD_ENABLE_LOAD = b"\xff\x06\x01\x0a\x00\x01"
# CMD_DISABLE_LOAD = b"\xff\x06\x01\x0a\x00\x00"
# REG_LOAD_ENABLE = 0x010A
def writeMemory(fh: BaseInterface, address: int, data: bytes):
if len(data) != 2:
raise ValueError(f"Data must consist of a two-byte word, got {len(data)} bytes")
header = construct_write_request(address)
write(fh, header + data)
tag = discardUntil(fh, 0xFF)
if tag is None:
return None
header = fh.read(3)
if header and len(header) == 3:
operation, size, address = header
logger.log(5, header)
# size field is zero when writing device name for whatever reason
# write command seems to only accept a single word, so this is fine;
# we just hardcode the number of bytes read to two here.
rdata = fh.read(2)
_crc = fh.read(2)
if rdata and _crc:
try:
crc = struct.unpack_from("<H", _crc)[0]
except struct.error:
logger.error(
f"writeMemory: CRC error; read {len(_crc)} bytes (2 expected)"
)
return None
calculated_crc = modbus(bytes([tag, operation, size, address, *rdata]))
if crc == calculated_crc:
return rdata
else:
logger.error(
f"writeMemory: CRC error; {crc:04X} != {calculated_crc:04X}"
)
logger.error("data or crc is falsely %s %s %s", header, rdata, _crc)
return None
def writeMemoryMultiple(fh: BaseInterface, address: int, data: bytes):
if len(data) % 2:
raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes")
res = bytearray()
for i in range(len(data) // 2):
d = data[i * 2 : (i + 1) * 2]
r = writeMemory(fh, address + i, d)
if r:
res.extend(r)
return res
def try_read_parse(
dev: BaseInterface,
address: int,
words: int = 1,
parser: Optional[Callable] = None,
attempts=5,
) -> Optional[dict]:
while attempts:
attempts -= 1
res = readMemory(dev, address, words)
if res:
try:
if parser:
return parser(res)
except struct.error:
logger.exception("0x0100 Unpack error: %s %s", len(res), res)
_timeout = dev.timeout
dev.timeout = 0.5
logger.warning("Flushed from read buffer; %s", dev.read())
dev.timeout = _timeout
else:
logger.warning(
f"No data read, expected {words*2} bytes (attempts left: {attempts})"
)
return None
class ChargeController:
device: BaseInterface
manufacturer: str = "SRNE Solar Co., Ltd."
manufacturer_id: str = "srne"
def __init__(self, device: BaseInterface):
self.device = device
_cached_serial: str | None = None
@property
def serial(self) -> str:
if self._cached_serial is not None:
return self._cached_serial
data = readMemory(self.device, 0x18, 3)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
p1 = data[0]
p2 = data[1]
p3 = (data[2] << 8) + data[3]
self._cached_serial = f"{p1:02n}-{p2:02n}-{p3:04n}"
return self._cached_serial
_cached_model: str | None = None
@property
def model(self) -> str:
if self._cached_model is not None:
return self._cached_model
data = readMemory(self.device, 0x0C, 8)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
self._cached_model = data.decode("utf-8").strip()
return self._cached_model
_cached_version: str | None = None
@property
def version(self) -> str:
if self._cached_version is not None:
return self._cached_version
data = readMemory(self.device, 0x14, 4)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
major = (data[0] << 8) + data[1]
minor = data[2]
patch = data[3]
self._cached_version = f"{major}.{minor}.{patch}"
return self._cached_version
_cached_name: str | None = None
@property
def name(self) -> str:
if self._cached_name is not None:
return self._cached_name
data = readMemory(self.device, 0x0049, 16)
if data is None:
raise IOError
res = data.decode("UTF-16BE").strip()
return res
@name.setter
def name(self, value: str):
bin_value = bytearray(value.encode("UTF-16BE"))
if len(bin_value) > 32:
raise ValueError(
f"value must be no more than 32 bytes once encoded as UTF-16BE. {len(bin_value)} bytes supplied"
)
# Pad name to 32 bytes to ensure ensure nothing is left of old name
while len(bin_value) < 32:
bin_value.extend(b"\x00\x20")
data = writeMemoryMultiple(self.device, 0x0049, bin_value)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
res = data.decode("UTF-16BE").strip()
if res != value:
logger.error("setting device name failed; %r != %r", res, value)
self._cached_name = value
@property
def load_enabled(self) -> bool:
data = readMemory(self.device, 0x010A, 1)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return struct.unpack("x?", data)[0]
@load_enabled.setter
def load_enabled(self, value: bool):
data = writeMemory(self.device, 0x010A, struct.pack("x?", value))
if data is not None:
res = struct.unpack("x?", data)[0]
if res != value:
logger.error("setting load_enabled failed; %r != %r", res, value)
else:
logger.error("setting load_enabled failed; communications error")
@property
def state(self) -> ChargerState:
data = readMemory(self.device, 0x0100, 11)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return ChargerState(data)
def get_historical(self, day) -> HistoricalData:
data = readMemory(self.device, 0xF000 + day, 10)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return HistoricalData(data)
@property
def today(self) -> HistoricalData:
data = readMemory(self.device, 0x010B, 10)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return HistoricalData(data)
@property
def extra(self) -> HistoricalExtraInfo:
data = readMemory(self.device, 0x0115, 11)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return HistoricalExtraInfo(data)