2023-01-07 21:18:30 +00:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import struct
|
|
|
|
import sys
|
|
|
|
import time
|
|
|
|
from io import RawIOBase
|
|
|
|
from typing import Callable, Collection, Optional
|
|
|
|
|
|
|
|
from libscrc import modbus
|
|
|
|
|
2023-04-10 01:39:19 +00:00
|
|
|
from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER
|
|
|
|
from .interfaces import BaseInterface
|
2023-01-07 21:18:30 +00:00
|
|
|
from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, DataItem
|
|
|
|
from .util import log
|
|
|
|
|
|
|
|
|
|
|
|
def write(fh, data):
|
|
|
|
bdata = bytes(data)
|
|
|
|
crc = modbus(bdata)
|
|
|
|
bcrc = bytes([crc & 0xFF, (crc & 0xFF00) >> 8])
|
|
|
|
fh.write(data + bcrc)
|
|
|
|
|
|
|
|
|
|
|
|
def construct_request(address, words=1, action=ACTION_READ, marker=0xFF):
|
|
|
|
assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}"
|
|
|
|
return struct.pack("!BBHH", marker, action, address, words)
|
|
|
|
|
|
|
|
|
2023-04-10 01:39:19 +00:00
|
|
|
def construct_write(address, data: bytes, action=ACTION_WRITE, marker=0xFF):
|
|
|
|
assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}"
|
|
|
|
return struct.pack("!BBH", marker, action, address) + data
|
|
|
|
|
|
|
|
|
2023-01-07 21:18:30 +00:00
|
|
|
def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict:
|
|
|
|
pos = offset
|
|
|
|
res = {}
|
|
|
|
|
|
|
|
for i in items:
|
|
|
|
res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0])
|
|
|
|
pos += i.st_size
|
|
|
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
|
|
# GET_BATTERY_STATE
|
|
|
|
def parse_battery_state(data: bytes) -> dict:
|
|
|
|
return parse(data, DATA_BATTERY_STATE)
|
|
|
|
|
|
|
|
|
|
|
|
def parse_historical_entry(data: bytes) -> dict:
|
|
|
|
res = parse(data, HISTORICAL_DATA[:10])
|
|
|
|
|
|
|
|
res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]])
|
|
|
|
|
|
|
|
if len(data) > res_datalen:
|
|
|
|
res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen))
|
|
|
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
|
|
def parse_packet(data):
|
|
|
|
tag, operation, size = struct.unpack_from("BBB", data)
|
|
|
|
_unpacked = struct.unpack_from(f"<{size}BH", data, offset=3)
|
|
|
|
crc = _unpacked[-1]
|
|
|
|
payload = _unpacked[:-1]
|
|
|
|
calculated_crc = modbus(bytes([tag, operation, size, *payload]))
|
|
|
|
|
|
|
|
if crc != calculated_crc:
|
|
|
|
e = ValueError(f"CRC missmatch: expected {crc:04X}, got {calculated_crc:04X}.")
|
|
|
|
e.tag = tag
|
|
|
|
e.operation = operation
|
|
|
|
e.size = size
|
|
|
|
e.payload = payload
|
|
|
|
e.crc = crc
|
|
|
|
e.calculated_crc = calculated_crc
|
|
|
|
raise e
|
|
|
|
|
|
|
|
return payload
|
|
|
|
|
|
|
|
|
|
|
|
def discardUntil(fh: RawIOBase, byte: int, timeout=10) -> Optional[int]:
|
|
|
|
assert byte >= 0 and byte < 256, f"byte: Expected 8bit unsigned int, got {byte}"
|
|
|
|
|
|
|
|
def expand(b: Optional[bytes]):
|
|
|
|
if b is None:
|
|
|
|
return b
|
|
|
|
return b[0]
|
|
|
|
|
|
|
|
start = time.time()
|
|
|
|
discarded = 0
|
|
|
|
read_byte = expand(fh.read(1))
|
|
|
|
while read_byte != byte:
|
|
|
|
if read_byte is not None:
|
|
|
|
if not discarded:
|
|
|
|
log("Discarding", end="")
|
|
|
|
discarded += 1
|
|
|
|
print(f" {read_byte:02X}", end="")
|
|
|
|
sys.stdout.flush()
|
|
|
|
|
|
|
|
if time.time() - start > timeout:
|
|
|
|
read_byte = None
|
|
|
|
break
|
|
|
|
|
|
|
|
read_byte = expand(fh.read(1))
|
|
|
|
|
|
|
|
if discarded:
|
|
|
|
print()
|
|
|
|
sys.stdout.flush()
|
|
|
|
|
|
|
|
return read_byte
|
|
|
|
|
|
|
|
|
2023-04-10 01:39:19 +00:00
|
|
|
def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[bytes]:
|
2023-01-07 21:18:30 +00:00
|
|
|
# log(f"Reading {words} words from 0x{address:04X}")
|
|
|
|
request = construct_request(address, words=words)
|
|
|
|
# log("Request:", request)
|
|
|
|
write(fh, request)
|
|
|
|
|
|
|
|
tag = discardUntil(fh, 0xFF)
|
|
|
|
if tag is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
header = fh.read(2)
|
|
|
|
if header and len(header) == 2:
|
|
|
|
operation, size = header
|
|
|
|
data = fh.read(size)
|
|
|
|
_crc = fh.read(2)
|
|
|
|
if data and _crc:
|
|
|
|
try:
|
|
|
|
crc = struct.unpack_from("<H", _crc)[0]
|
|
|
|
except struct.error:
|
|
|
|
log(f"readMemory: CRC error; read {len(_crc)} bytes (2 expected)")
|
|
|
|
return None
|
|
|
|
calculated_crc = modbus(bytes([tag, operation, size, *data]))
|
|
|
|
if crc == calculated_crc:
|
|
|
|
return data
|
|
|
|
else:
|
|
|
|
log(f"readMemory: CRC error; {crc:04X} != {calculated_crc:04X}")
|
|
|
|
log("data or crc is falsely", header, data, _crc)
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2023-04-10 01:39:19 +00:00
|
|
|
def writeMemory(fh: BaseInterface, address: int, output_data: bytes) -> Optional[bytes]:
|
|
|
|
# TODO: Verify behavior on multi-word writes
|
|
|
|
# log(f"Reading {words} words from 0x{address:04X}")
|
|
|
|
request = construct_write(address, data=output_data)
|
|
|
|
# log("Request:", request)
|
|
|
|
write(fh, request)
|
|
|
|
|
|
|
|
tag = discardUntil(fh, 0xFF)
|
|
|
|
if tag is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
_operation = fh.read(1)
|
|
|
|
result_addr = fh.read(2)
|
|
|
|
# log("Operation:", _operation)
|
|
|
|
if _operation is not None and result_addr is not None:
|
|
|
|
operation = _operation[0]
|
|
|
|
data = fh.read(2)
|
|
|
|
# log("Data:", data)
|
|
|
|
_crc = fh.read(2)
|
|
|
|
if data and _crc:
|
|
|
|
try:
|
|
|
|
crc = struct.unpack_from("<H", _crc)[0]
|
|
|
|
except struct.error:
|
|
|
|
log(f"readMemory: CRC error; read {len(_crc)} bytes (2 expected)")
|
|
|
|
return None
|
|
|
|
calculated_crc = modbus(bytes([tag, operation, *result_addr, *data]))
|
|
|
|
if crc == calculated_crc:
|
|
|
|
return data
|
|
|
|
else:
|
|
|
|
log(f"readMemory: CRC error; {crc:04X} != {calculated_crc:04X}")
|
|
|
|
log("data or crc is falsely", operation, result_addr, data, _crc)
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2023-01-07 21:18:30 +00:00
|
|
|
def try_read_parse(
|
2023-04-10 01:39:19 +00:00
|
|
|
dev: BaseInterface,
|
2023-01-07 21:18:30 +00:00
|
|
|
address: int,
|
|
|
|
words: int = 1,
|
|
|
|
parser: Optional[Callable] = None,
|
|
|
|
attempts=5,
|
|
|
|
) -> Optional[dict]:
|
|
|
|
while attempts:
|
|
|
|
attempts -= 1
|
|
|
|
res = readMemory(dev, address, words)
|
|
|
|
if res:
|
|
|
|
try:
|
|
|
|
if parser:
|
|
|
|
return parser(res)
|
|
|
|
except struct.error as e:
|
|
|
|
log(e)
|
|
|
|
log("0x0100 Unpack error:", len(res), res)
|
2023-04-10 01:39:19 +00:00
|
|
|
log("Flushed from read buffer; ", dev.read()) # TODO: timeout=0.5
|
2023-01-07 21:18:30 +00:00
|
|
|
else:
|
|
|
|
log(f"No data read, expected {words*2} bytes (attempts left: {attempts})")
|
|
|
|
return None
|