104 lines
3.1 KiB
Python
104 lines
3.1 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
import struct
|
||
|
from decimal import Decimal
|
||
|
from functools import cached_property
|
||
|
from typing import Optional
|
||
|
|
||
|
from .interfaces import BaseInterface
|
||
|
from .protocol import (
|
||
|
parse_battery_state,
|
||
|
parse_historical_entry,
|
||
|
readMemory,
|
||
|
try_read_parse,
|
||
|
writeMemory,
|
||
|
)
|
||
|
from .solar_types import DataName
|
||
|
|
||
|
|
||
|
class Srne:
|
||
|
_dev: BaseInterface
|
||
|
|
||
|
def __init__(self, dev: BaseInterface) -> None:
|
||
|
self._dev = dev
|
||
|
|
||
|
def get_historical_entry(self, day: Optional[int] = None) -> dict:
|
||
|
address = 0x010B
|
||
|
words = 21
|
||
|
if day is not None:
|
||
|
address = 0xF000 + day
|
||
|
res = try_read_parse(self._dev, address, words, parse_historical_entry)
|
||
|
|
||
|
if res is None:
|
||
|
raise TimeoutError("Timeout reading historical entry")
|
||
|
return res
|
||
|
|
||
|
def run_days(self) -> int:
|
||
|
return self.get_historical_entry()["run_days"]
|
||
|
|
||
|
def get_battery_state(self) -> dict:
|
||
|
data = try_read_parse(self._dev, 0x0100, 11, parse_battery_state)
|
||
|
|
||
|
if data is None:
|
||
|
raise TimeoutError("Timeout reading battery state")
|
||
|
|
||
|
data[DataName.CALCULATED_BATTERY_POWER] = float(
|
||
|
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
|
||
|
* Decimal(str(data.get(DataName.BATTERY_CURRENT, 0)))
|
||
|
)
|
||
|
data[DataName.CALCULATED_PANEL_POWER] = float(
|
||
|
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
|
||
|
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
|
||
|
)
|
||
|
data[DataName.CALCULATED_LOAD_POWER] = float(
|
||
|
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
|
||
|
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
|
||
|
)
|
||
|
return data
|
||
|
|
||
|
@cached_property
|
||
|
def model(self) -> str:
|
||
|
data = readMemory(self._dev, address=0x000C, words=8)
|
||
|
if data is None:
|
||
|
raise TimeoutError("Timeout reading model")
|
||
|
|
||
|
return data.decode().strip()
|
||
|
|
||
|
@cached_property
|
||
|
def version(self) -> str:
|
||
|
data = readMemory(self._dev, address=0x0014, words=2)
|
||
|
if data is None:
|
||
|
raise TimeoutError("Timeout reading version")
|
||
|
|
||
|
return "{}.{}.{}".format(*struct.unpack("!HBB", data))
|
||
|
|
||
|
@cached_property
|
||
|
def serial(self) -> str:
|
||
|
data = readMemory(self._dev, address=0x0018, words=2)
|
||
|
if data is None:
|
||
|
raise TimeoutError("Timeout reading serial")
|
||
|
|
||
|
return "{:02n}-{:02n}-{:04n}".format(*struct.unpack("!BBH", data))
|
||
|
|
||
|
@property
|
||
|
def load_enabled(self) -> bool:
|
||
|
data = readMemory(self._dev, address=0x010A)
|
||
|
if data is None:
|
||
|
raise TimeoutError("Timeout reading serial")
|
||
|
|
||
|
return bool(struct.unpack("!xB", data)[0])
|
||
|
|
||
|
def enable_load(self, enable: bool) -> bool:
|
||
|
data = writeMemory(self._dev, 0x010A, bytes((0, enable)))
|
||
|
if data is None:
|
||
|
raise TimeoutError("Timeout reading serial")
|
||
|
print(data)
|
||
|
return bool(struct.unpack("!xB", data)[0])
|
||
|
|
||
|
@cached_property
|
||
|
def name(self) -> str:
|
||
|
data = readMemory(self._dev, address=0x0049, words=16)
|
||
|
if data is None:
|
||
|
raise TimeoutError("Timeout reading name")
|
||
|
|
||
|
return data.decode("utf-16be").strip()
|