diff --git a/misc/dump_memory_map.py b/misc/dump_memory_map.py index c69ef64..f9cbc2a 100644 --- a/misc/dump_memory_map.py +++ b/misc/dump_memory_map.py @@ -8,49 +8,12 @@ sys.path.insert(1, os.path.dirname(os.path.dirname(sys.argv[0]))) from draw_memory_map import memory_table # noqa: E402 from srnemqtt.config import get_config, get_interface # noqa: E402 -from srnemqtt.interfaces import BaseInterface # noqa: E402 from srnemqtt.protocol import readMemory # noqa: E402 - -def get_device_name(iface: BaseInterface) -> str | None: - data = readMemory(iface, 0x0C, 8) - if data is None: - return None - - return data.decode("utf-8").strip() - - -def get_device_version(iface: BaseInterface) -> str | None: - data = readMemory(iface, 0x14, 4) - if data is None: - return None - - major = (data[0] << 8) + data[1] - minor = data[2] - patch = data[3] - - return f"{major}.{minor}.{patch}" - - -def get_device_serial(iface: BaseInterface) -> str | None: - data = readMemory(iface, 0x18, 3) - if data is None: - return None - - p1 = data[0] - p2 = data[1] - p3 = (data[2] << 8) + data[3] - return f"{p1}-{p2}-{p3}" - - if __name__ == "__main__": conf = get_config() iface = get_interface(conf) - print(get_device_name(iface)) - print(get_device_version(iface)) - print(get_device_serial(iface)) - data: List[int] = [] for i in range(0, 0xFFFF, 16): newdata = readMemory(iface, i, 16) diff --git a/misc/test_bleuart.py b/misc/test_bleuart.py index 0d074d6..7e79f1f 100644 --- a/misc/test_bleuart.py +++ b/misc/test_bleuart.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- from srnemqtt.constants import MAC from srnemqtt.lib.feasycom_ble import BTLEUart -from srnemqtt.protocol import construct_request, write +from srnemqtt.protocol import construct_read_request, write with BTLEUart(MAC, timeout=1) as x: print(x) - write(x, construct_request(0x0E, words=3)) + write(x, construct_read_request(0x0E, words=3)) x.read(3, timeout=1) print(x.read(6, timeout=0.01)) x.read(2, timeout=0.01) diff --git a/misc/test_load_switch.py b/misc/test_load_switch.py new file mode 100644 index 0000000..c5d7234 --- /dev/null +++ b/misc/test_load_switch.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +import os +import sys +from time import sleep + +sys.path.insert(1, os.path.dirname(os.path.dirname(sys.argv[0]))) + +from srnemqtt.config import get_config, get_interface # noqa: E402 +from srnemqtt.protocol import ChargeController # noqa: E402 + +if __name__ == "__main__": + conf = get_config() + iface = get_interface(conf) + cc = ChargeController(iface) + + print(f"Serial: {cc.serial}") + print(f"Load enabled: {cc.load_enabled}") + cc.load_enabled = True + print(f"Load enabled: {cc.load_enabled}") + sleep(5) + cc.load_enabled = False + print(f"Load enabled: {cc.load_enabled}") diff --git a/misc/test_serial.py b/misc/test_serial.py index c55be89..dacf5b8 100644 --- a/misc/test_serial.py +++ b/misc/test_serial.py @@ -9,7 +9,7 @@ print(sys.path) sys.path.insert(1, os.path.dirname(os.path.dirname(sys.argv[0]))) # from srnemqtt.constants import MAC # from srnemqtt.lib.feasycom_ble import BTLEUart -from srnemqtt.protocol import construct_request, write # noqa: E402 +from srnemqtt.protocol import construct_read_request, write # noqa: E402 # for rate in [1200, 2400, 4800, 9600, 115200]: for rate in [9600]: @@ -19,7 +19,7 @@ for rate in [9600]: print(x) - write(x, construct_request(0x0E, words=3)) + write(x, construct_read_request(0x0E, words=3)) print(x.read(3)) print(x.read(6)) print(x.read(2)) diff --git a/srnemqtt/protocol.py b/srnemqtt/protocol.py index f371db3..be711ef 100644 --- a/srnemqtt/protocol.py +++ b/srnemqtt/protocol.py @@ -6,9 +6,9 @@ from typing import Callable, Collection, Optional from libscrc import modbus # type: ignore -from .constants import ACTION_READ, POSSIBLE_MARKER +from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER from .interfaces import BaseInterface -from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, DataItem +from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, ChargerState, DataItem from .util import log @@ -19,9 +19,14 @@ def write(fh, data): fh.write(data + bcrc) -def construct_request(address, words=1, action=ACTION_READ, marker=0xFF): +def construct_read_request(address, words=1, marker=0xFF): assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}" - return struct.pack("!BBHH", marker, action, address, words) + return struct.pack("!BBHH", marker, ACTION_READ, address, words) + + +def construct_write_request(address, marker=0xFF): + assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}" + return struct.pack("!BBH", marker, ACTION_WRITE, address) def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict: @@ -87,7 +92,6 @@ def discardUntil(fh: BaseInterface, byte: int, timeout=10) -> Optional[int]: if not discarded: log("Discarding", end="") discarded += 1 - print(read_byte) print(f" {read_byte:02X}", end="") sys.stdout.flush() @@ -106,7 +110,7 @@ def discardUntil(fh: BaseInterface, byte: int, timeout=10) -> Optional[int]: def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[bytes]: # log(f"Reading {words} words from 0x{address:04X}") - request = construct_request(address, words=words) + request = construct_read_request(address, words=words) # log("Request:", request) write(fh, request) @@ -134,6 +138,44 @@ def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[byte return None +# set(255, 266, 1 or 0) +# ff 06 01 0a 00 01 +# CMD_ENABLE_LOAD = b"\xff\x06\x01\x0a\x00\x01" +# CMD_DISABLE_LOAD = b"\xff\x06\x01\x0a\x00\x00" +# REG_LOAD_ENABLE = 0x010A + + +def writeMemory(fh: BaseInterface, address: int, data: bytes): + if len(data) % 2: + raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes") + + header = construct_write_request(address) + write(fh, header + data) + + tag = discardUntil(fh, 0xFF) + if tag is None: + return None + + header = fh.read(3) + if header and len(header) == 3: + operation, size, address = header + rdata = fh.read(size * 2) + _crc = fh.read(2) + if rdata and _crc: + try: + crc = struct.unpack_from(" str: + data = readMemory(self.device, 0x18, 3) + if data is None: + raise IOError + + p1 = data[0] + p2 = data[1] + p3 = (data[2] << 8) + data[3] + return f"{p1}-{p2}-{p3}" + + @property + def model(self) -> str: + data = readMemory(self.device, 0x0C, 8) + if data is None: + raise IOError + + return data.decode("utf-8").strip() + + @property + def version(self) -> str: + data = readMemory(self.device, 0x14, 4) + if data is None: + raise IOError + + major = (data[0] << 8) + data[1] + minor = data[2] + patch = data[3] + + return f"{major}.{minor}.{patch}" + + @property + def load_enabled(self) -> bool: + data = readMemory(self.device, 0x010A, 1) + if data is None: + raise IOError + + return struct.unpack("x?", data)[0] + + @load_enabled.setter + def load_enabled(self, value: bool): + data = writeMemory(self.device, 0x010A, struct.pack("x?", value)) + if data is not None: + res = struct.unpack("x?", data)[0] + if res != value: + log(f"setting load_enabled failed; {res!r} != {value!r}") + else: + log("setting load_enabled failed; communications error") + + @property + def state(self) -> ChargerState: + raise NotImplementedError + """ + data = try_read_parse(dev, 0x0100, 11, parse_battery_state) + if data: + data[DataName.CALCULATED_BATTERY_POWER] = float( + Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0))) + * Decimal(str(data.get(DataName.BATTERY_CURRENT, 0))) + ) + data[DataName.CALCULATED_PANEL_POWER] = float( + Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0))) + * Decimal(str(data.get(DataName.PANEL_CURRENT, 0))) + ) + data[DataName.CALCULATED_LOAD_POWER] = float( + Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0))) + * Decimal(str(data.get(DataName.LOAD_CURRENT, 0))) + ) + log(data) + for consumer in consumers: + consumer.write(data) + """ diff --git a/srnemqtt/solar_types.py b/srnemqtt/solar_types.py index c79b967..78276ec 100644 --- a/srnemqtt/solar_types.py +++ b/srnemqtt/solar_types.py @@ -119,3 +119,8 @@ HISTORICAL_DATA = [ DataItem(DataName.TOTAL_PRODUCTION_ENERGY, "L", "Wh"), DataItem(DataName.TOTAL_CONSUMPTION_ENERGY, "L", "Wh"), ] + + +class ChargerState: + def __init__(self, data: bytes | bytearray | memoryview) -> None: + raise NotImplementedError