srne-mqtt/srnemqtt/srne.py

104 lines
3.1 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
import struct
from decimal import Decimal
from functools import cached_property
from typing import Optional
from .interfaces import BaseInterface
from .protocol import (
parse_battery_state,
parse_historical_entry,
readMemory,
try_read_parse,
writeMemory,
)
from .solar_types import DataName
class Srne:
_dev: BaseInterface
def __init__(self, dev: BaseInterface) -> None:
self._dev = dev
def get_historical_entry(self, day: Optional[int] = None) -> dict:
address = 0x010B
words = 21
if day is not None:
address = 0xF000 + day
res = try_read_parse(self._dev, address, words, parse_historical_entry)
if res is None:
raise TimeoutError("Timeout reading historical entry")
return res
def run_days(self) -> int:
return self.get_historical_entry()["run_days"]
def get_battery_state(self) -> dict:
data = try_read_parse(self._dev, 0x0100, 11, parse_battery_state)
if data is None:
raise TimeoutError("Timeout reading battery state")
data[DataName.CALCULATED_BATTERY_POWER] = float(
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.BATTERY_CURRENT, 0)))
)
data[DataName.CALCULATED_PANEL_POWER] = float(
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
)
data[DataName.CALCULATED_LOAD_POWER] = float(
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
)
return data
@cached_property
def model(self) -> str:
data = readMemory(self._dev, address=0x000C, words=8)
if data is None:
raise TimeoutError("Timeout reading model")
return data.decode().strip()
@cached_property
def version(self) -> str:
data = readMemory(self._dev, address=0x0014, words=2)
if data is None:
raise TimeoutError("Timeout reading version")
return "{}.{}.{}".format(*struct.unpack("!HBB", data))
@cached_property
def serial(self) -> str:
data = readMemory(self._dev, address=0x0018, words=2)
if data is None:
raise TimeoutError("Timeout reading serial")
return "{:02n}-{:02n}-{:04n}".format(*struct.unpack("!BBH", data))
@property
def load_enabled(self) -> bool:
data = readMemory(self._dev, address=0x010A)
if data is None:
raise TimeoutError("Timeout reading serial")
return bool(struct.unpack("!xB", data)[0])
def enable_load(self, enable: bool) -> bool:
data = writeMemory(self._dev, 0x010A, bytes((0, enable)))
if data is None:
raise TimeoutError("Timeout reading serial")
print(data)
return bool(struct.unpack("!xB", data)[0])
@cached_property
def name(self) -> str:
data = readMemory(self._dev, address=0x0049, words=16)
if data is None:
raise TimeoutError("Timeout reading name")
return data.decode("utf-16be").strip()