From f0c20574288d44b46326321831f627f797d6a213 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Odd=20Str=C3=A5b=C3=B8?= Date: Sun, 10 Dec 2023 13:51:07 +0100 Subject: [PATCH] Implement more getters in ChargeController --- srnemqtt/protocol.py | 57 ++++++++----- srnemqtt/solar_types.py | 180 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 213 insertions(+), 24 deletions(-) diff --git a/srnemqtt/protocol.py b/srnemqtt/protocol.py index 50cfc06..4229a4a 100644 --- a/srnemqtt/protocol.py +++ b/srnemqtt/protocol.py @@ -8,7 +8,14 @@ from libscrc import modbus # type: ignore from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER from .interfaces import BaseInterface -from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, ChargerState, DataItem +from .solar_types import ( + DATA_BATTERY_STATE, + HISTORICAL_DATA, + ChargerState, + DataItem, + HistoricalData, + HistoricalExtraInfo, +) from .util import log @@ -259,23 +266,31 @@ class ChargeController: @property def state(self) -> ChargerState: - raise NotImplementedError - """ - data = try_read_parse(dev, 0x0100, 11, parse_battery_state) - if data: - data[DataName.CALCULATED_BATTERY_POWER] = float( - Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0))) - * Decimal(str(data.get(DataName.BATTERY_CURRENT, 0))) - ) - data[DataName.CALCULATED_PANEL_POWER] = float( - Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0))) - * Decimal(str(data.get(DataName.PANEL_CURRENT, 0))) - ) - data[DataName.CALCULATED_LOAD_POWER] = float( - Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0))) - * Decimal(str(data.get(DataName.LOAD_CURRENT, 0))) - ) - log(data) - for consumer in consumers: - consumer.write(data) - """ + data = readMemory(self.device, 0x0100, 11) + if data is None: + raise IOError # FIXME: Raise specific error in readMemory + + return ChargerState(data) + + def get_historical(self, day) -> HistoricalData: + data = readMemory(self.device, 0xF000 + day, 10) + if data is None: + raise IOError # FIXME: Raise specific error in readMemory + + return HistoricalData(data) + + @property + def today(self) -> HistoricalData: + data = readMemory(self.device, 0x010B, 10) + if data is None: + raise IOError # FIXME: Raise specific error in readMemory + + return HistoricalData(data) + + @property + def extra(self) -> HistoricalExtraInfo: + data = readMemory(self.device, 0x0115, 11) + if data is None: + raise IOError # FIXME: Raise specific error in readMemory + + return HistoricalExtraInfo(data) diff --git a/srnemqtt/solar_types.py b/srnemqtt/solar_types.py index 78276ec..daf62bc 100644 --- a/srnemqtt/solar_types.py +++ b/srnemqtt/solar_types.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- import struct +from abc import ABC, abstractmethod from enum import Enum, unique -from typing import Callable, Optional +from typing import Any, Callable, Dict, Optional @unique @@ -111,6 +112,7 @@ HISTORICAL_DATA = [ DataItem(DataName.DISCHARGE_AMP_HOUR, "H", "Ah"), DataItem(DataName.PRODUCTION_ENERGY, "H", "Wh"), DataItem(DataName.CONSUMPTION_ENERGY, "H", "Wh"), + # DataItem(DataName.RUN_DAYS, "H"), DataItem(DataName.DISCHARGE_COUNT, "H"), DataItem(DataName.FULL_CHARGE_COUNT, "H"), @@ -121,6 +123,178 @@ HISTORICAL_DATA = [ ] -class ChargerState: +class DecodedData(ABC): + @abstractmethod def __init__(self, data: bytes | bytearray | memoryview) -> None: - raise NotImplementedError + ... + + @abstractmethod + def as_dict(self) -> Dict[DataName, Any]: + ... + + +class ChargerState(DecodedData): + battery_charge: int + battery_voltage: float + battery_current: float + internal_temperature: int + battery_temperature: int + load_voltage: float + load_current: float + load_power: float + panel_voltage: float + panel_current: float + panel_power: float + load_enabled: bool + + def __init__(self, data: bytes | bytearray | memoryview) -> None: + ( + _battery_charge, + _battery_voltage, + _battery_current, + _internal_temperature, + _battery_temperature, + _load_voltage, + _load_current, + _load_power, + _panel_voltage, + _panel_current, + _panel_power, + _load_enabled, + ) = struct.unpack("HHHBBHHHHHHx?", data) + + self.battery_charge = _battery_charge + self.battery_voltage = _battery_voltage / 10 + self.battery_current = _battery_current / 100 + self.internal_temperature = parse_temperature(_internal_temperature) + self.battery_temperature = parse_temperature(_battery_temperature) + self.load_voltage = _load_voltage / 10 + self.load_current = _load_current / 100 + self.load_power = _load_power + self.panel_voltage = _panel_voltage / 10 + self.panel_current = _panel_current / 100 + self.panel_power = _panel_power + self.load_enabled = bool(_load_enabled) + + @property + def calculated_battery_power(self) -> float: + return self.battery_voltage * self.battery_current + + @property + def calculated_panel_power(self) -> float: + return self.panel_voltage * self.panel_current + + @property + def calculated_load_power(self) -> float: + return self.load_voltage * self.load_current + + def as_dict(self): + return { + DataName.BATTERY_CHARGE: self.battery_charge, + DataName.BATTERY_VOLTAGE: self.battery_voltage, + DataName.BATTERY_CURRENT: self.battery_current, + DataName.INTERNAL_TEMPERATURE: self.internal_temperature, + DataName.BATTERY_TEMPERATURE: self.battery_temperature, + DataName.LOAD_VOLTAGE: self.load_voltage, + DataName.LOAD_CURRENT: self.load_current, + DataName.LOAD_POWER: self.load_power, + DataName.PANEL_VOLTAGE: self.panel_voltage, + DataName.PANEL_CURRENT: self.panel_current, + DataName.PANEL_POWER: self.panel_power, + DataName.LOAD_ENABLED: self.load_enabled, + DataName.CALCULATED_BATTERY_POWER: self.calculated_battery_power, + DataName.CALCULATED_PANEL_POWER: self.calculated_panel_power, + DataName.CALCULATED_LOAD_POWER: self.calculated_load_power, + } + + +class HistoricalData(DecodedData): + battery_voltage_min: float + battery_voltage_max: float + charge_max_current: float + _discharge_max_current: float + charge_max_power: int + discharge_max_power: int + charge_amp_hour: int + discharge_amp_hour: int + production_energy: int + consumption_energy: int + + def __init__(self, data: bytes | bytearray | memoryview) -> None: + ( + _battery_voltage_min, + _battery_voltage_max, + _charge_max_current, + __discharge_max_current, + _charge_max_power, + _discharge_max_power, + _charge_amp_hour, + _discharge_amp_hour, + _production_energy, + _consumption_energy, + ) = struct.unpack("HHHHHHHHHH", data) + + self.battery_voltage_min = _battery_voltage_min / 10 + self.battery_voltage_max = _battery_voltage_max / 10 + self.charge_max_current = _charge_max_current / 100 + self._discharge_max_current = __discharge_max_current / 100 + self.charge_max_power = _charge_max_power + self.discharge_max_power = _discharge_max_power + self.charge_amp_hour = _charge_amp_hour + self.discharge_amp_hour = _discharge_amp_hour + self.production_energy = _production_energy + self.consumption_energy = _consumption_energy + + def as_dict(self): + return { + DataName.BATTERY_VOLTAGE_MIN: self.battery_voltage_min, + DataName.BATTERY_VOLTAGE_MAX: self.battery_voltage_max, + DataName.CHARGE_MAX_CURRENT: self.charge_max_current, + DataName._DISCHARGE_MAX_CURRENT: self._discharge_max_current, + DataName.CHARGE_MAX_POWER: self.charge_max_power, + DataName.DISCHARGE_MAX_POWER: self.discharge_max_power, + DataName.CHARGE_AMP_HOUR: self.charge_amp_hour, + DataName.DISCHARGE_AMP_HOUR: self.discharge_amp_hour, + DataName.PRODUCTION_ENERGY: self.production_energy, + DataName.CONSUMPTION_ENERGY: self.consumption_energy, + } + + +class HistoricalExtraInfo(DecodedData): + run_days: int + discharge_count: int + full_charge_count: int + total_charge_amp_hours: int + total_discharge_amp_hours: int + total_production_energy: int + total_consumption_energy: int + + def __init__(self, data: bytes | bytearray | memoryview) -> None: + ( + _run_days, + _discharge_count, + _full_charge_count, + _total_charge_amp_hours, + _total_discharge_amp_hours, + _total_production_energy, + _total_consumption_energy, + ) = struct.unpack("HHHLLLL", data) + + self.run_days = _run_days + self.discharge_count = _discharge_count + self.full_charge_count = _full_charge_count + self.total_charge_amp_hours = _total_charge_amp_hours + self.total_discharge_amp_hours = _total_discharge_amp_hours + self.total_production_energy = _total_production_energy + self.total_consumption_energy = _total_consumption_energy + + def as_dict(self): + return { + DataName.RUN_DAYS: self.run_days, + DataName.DISCHARGE_COUNT: self.discharge_count, + DataName.FULL_CHARGE_COUNT: self.full_charge_count, + DataName.TOTAL_CHARGE_AMP_HOURS: self.total_charge_amp_hours, + DataName.TOTAL_DISCHARGE_AMP_HOURS: self.total_discharge_amp_hours, + DataName.TOTAL_PRODUCTION_ENERGY: self.total_production_energy, + DataName.TOTAL_CONSUMPTION_ENERGY: self.total_consumption_energy, + }