# -*- coding: utf-8 -*- import struct from decimal import Decimal from functools import cached_property from typing import Optional from .interfaces import BaseInterface from .protocol import ( parse_battery_state, parse_historical_entry, readMemory, try_read_parse, writeMemory, ) from .solar_types import DataName class Srne: _dev: BaseInterface def __init__(self, dev: BaseInterface) -> None: self._dev = dev def get_historical_entry(self, day: Optional[int] = None) -> dict: address = 0x010B words = 21 if day is not None: address = 0xF000 + day res = try_read_parse(self._dev, address, words, parse_historical_entry) if res is None: raise TimeoutError("Timeout reading historical entry") return res def run_days(self) -> int: return self.get_historical_entry()["run_days"] def get_battery_state(self) -> dict: data = try_read_parse(self._dev, 0x0100, 11, parse_battery_state) if data is None: raise TimeoutError("Timeout reading battery state") data[DataName.CALCULATED_BATTERY_POWER] = float( Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0))) * Decimal(str(data.get(DataName.BATTERY_CURRENT, 0))) ) data[DataName.CALCULATED_PANEL_POWER] = float( Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0))) * Decimal(str(data.get(DataName.PANEL_CURRENT, 0))) ) data[DataName.CALCULATED_LOAD_POWER] = float( Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0))) * Decimal(str(data.get(DataName.LOAD_CURRENT, 0))) ) return data @cached_property def model(self) -> str: data = readMemory(self._dev, address=0x000C, words=8) if data is None: raise TimeoutError("Timeout reading model") return data.decode().strip() @cached_property def version(self) -> str: data = readMemory(self._dev, address=0x0014, words=2) if data is None: raise TimeoutError("Timeout reading version") return "{}.{}.{}".format(*struct.unpack("!HBB", data)) @cached_property def serial(self) -> str: data = readMemory(self._dev, address=0x0018, words=2) if data is None: raise TimeoutError("Timeout reading serial") return "{:02n}-{:02n}-{:04n}".format(*struct.unpack("!BBH", data)) @property def load_enabled(self) -> bool: data = readMemory(self._dev, address=0x010A) if data is None: raise TimeoutError("Timeout reading serial") return bool(struct.unpack("!xB", data)[0]) def enable_load(self, enable: bool) -> bool: data = writeMemory(self._dev, 0x010A, bytes((0, enable))) if data is None: raise TimeoutError("Timeout reading serial") print(data) return bool(struct.unpack("!xB", data)[0]) @cached_property def name(self) -> str: data = readMemory(self._dev, address=0x0049, words=16) if data is None: raise TimeoutError("Timeout reading name") return data.decode("utf-16be").strip()