Compare commits

..

7 commits

7 changed files with 389 additions and 107 deletions

View file

@ -20,7 +20,7 @@ HISTORICAL_KEYS = {
DataName.BATTERY_VOLTAGE_MIN, DataName.BATTERY_VOLTAGE_MIN,
DataName.BATTERY_VOLTAGE_MAX, DataName.BATTERY_VOLTAGE_MAX,
DataName.CHARGE_MAX_CURRENT, DataName.CHARGE_MAX_CURRENT,
DataName._DISCHARGE_MAX_CURRENT, DataName.DISCHARGE_MAX_CURRENT,
DataName.CHARGE_MAX_POWER, DataName.CHARGE_MAX_POWER,
DataName.DISCHARGE_MAX_POWER, DataName.DISCHARGE_MAX_POWER,
DataName.CHARGE_AMP_HOUR, DataName.CHARGE_AMP_HOUR,
@ -58,7 +58,7 @@ KNOWN_KEYS = HISTORICAL_KEYS.union(INSTANT_KEYS)
MAP = { MAP = {
"_internal_temperature?": "internal_temp", "_internal_temperature?": "internal_temp",
"unknown1": "charge_max_current", "unknown1": "charge_max_current",
"unknown2": "_discharge_max_current?", "unknown2": "discharge_max_current",
"internal_temperature": "internal_temp", "internal_temperature": "internal_temp",
"battery_temperature": "battery_temp", "battery_temperature": "battery_temp",
} }

View file

@ -20,3 +20,7 @@ if __name__ == "__main__":
sleep(5) sleep(5)
cc.load_enabled = False cc.load_enabled = False
print(f"Load enabled: {cc.load_enabled}") print(f"Load enabled: {cc.load_enabled}")
# print(f"Name: {cc.name}")
# cc.name = "☀️ 🔌🔋Charger"
# print(f"Name: {cc.name}")

View file

@ -2,15 +2,12 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import time import time
from decimal import Decimal
from typing import cast
from bluepy.btle import BTLEDisconnectError # type: ignore from bluepy.btle import BTLEDisconnectError # type: ignore
from serial import SerialException # type: ignore from serial import SerialException # type: ignore
from .config import get_config, get_consumers, get_interface from .config import get_config, get_consumers, get_interface
from .protocol import parse_battery_state, parse_historical_entry, try_read_parse from .protocol import ChargeController
from .solar_types import DataName
from .util import Periodical, log from .util import Periodical, log
@ -35,67 +32,56 @@ def main():
with get_interface() as dev: with get_interface() as dev:
log("Connected.") log("Connected.")
cc = ChargeController(dev)
log(f"Controller model: {cc.model}")
log(f"Controller version: {cc.version}")
log(f"Controller serial: {cc.serial}")
for consumer in consumers:
consumer.controller = cc
# write(dev, construct_request(0, 32)) # write(dev, construct_request(0, 32))
# Memory dump # Memory dump
# for address in range(0, 0x10000, 16): # for address in range(0, 0x10000, 16):
# log(f"Reading 0x{address:04X}...") # log(f"Reading 0x{address:04X}...")
# write(wd, construct_request(address, 16)) # write(wd, construct_request(address, 16))
days = 7 extra = cc.extra
res = try_read_parse(dev, 0x010B, 21, parse_historical_entry) days = extra.run_days
if res:
log(res) res = cc.today.as_dict()
for consumer in consumers: res.update(extra.as_dict())
consumer.write(res) for consumer in consumers:
days = cast(int, res.get("run_days", 7)) consumer.write(res)
del extra
for i in range(days): for i in range(days):
res = try_read_parse( hist = cc.get_historical(i)
dev, 0xF000 + i, 10, parse_historical_entry res = hist.as_dict()
) log({i: res})
if res: for consumer in consumers:
log({i: res}) consumer.write({str(i): res})
for consumer in consumers:
consumer.write({str(i): res})
while True: while True:
now = time.time() now = time.time()
if per_voltages(now): if per_voltages(now):
data = try_read_parse(dev, 0x0100, 11, parse_battery_state) data = cc.state.as_dict()
if data: log(data)
data[DataName.CALCULATED_BATTERY_POWER] = float( for consumer in consumers:
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0))) consumer.write(data)
* Decimal(
str(data.get(DataName.BATTERY_CURRENT, 0))
)
)
data[DataName.CALCULATED_PANEL_POWER] = float(
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
)
data[DataName.CALCULATED_LOAD_POWER] = float(
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
)
log(data)
for consumer in consumers:
consumer.write(data)
if per_current_hist(now): if per_current_hist(now):
data = try_read_parse( data = cc.today.as_dict()
dev, 0x010B, 21, parse_historical_entry data.update(cc.extra.as_dict())
) log(data)
if data: for consumer in consumers:
log(data) consumer.write(data)
for consumer in consumers:
consumer.write(data)
# print(".") # print(".")
for consumer in consumers: for consumer in consumers:
consumer.poll() consumer.poll()
time.sleep(max(0, 1 - time.time() - now)) time.sleep(max(0, 1 - (time.time() - now)))
# if STATUS.get('load_enabled'): # if STATUS.get('load_enabled'):
# write(wd, CMD_DISABLE_LOAD) # write(wd, CMD_DISABLE_LOAD)

View file

@ -2,9 +2,12 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Dict from typing import Any, Dict
from ..protocol import ChargeController
class BaseConsumer(ABC): class BaseConsumer(ABC):
settings: Dict[str, Any] settings: Dict[str, Any]
controller: ChargeController | None = None
@abstractmethod @abstractmethod
def __init__(self, settings: Dict[str, Any]) -> None: def __init__(self, settings: Dict[str, Any]) -> None:

View file

@ -13,7 +13,7 @@ MAP_VALUES: Dict[DataName, Dict[str, Any]] = {
# DataName.BATTERY_VOLTAGE_MIN: {}, # DataName.BATTERY_VOLTAGE_MIN: {},
# DataName.BATTERY_VOLTAGE_MAX: {}, # DataName.BATTERY_VOLTAGE_MAX: {},
# DataName.CHARGE_MAX_CURRENT: {}, # DataName.CHARGE_MAX_CURRENT: {},
# DataName._DISCHARGE_MAX_CURRENT: {}, # DataName.DISCHARGE_MAX_CURRENT: {},
# DataName.CHARGE_MAX_POWER: {}, # DataName.CHARGE_MAX_POWER: {},
# DataName.DISCHARGE_MAX_POWER: {}, # DataName.DISCHARGE_MAX_POWER: {},
# DataName.CHARGE_AMP_HOUR: {}, # DataName.CHARGE_AMP_HOUR: {},
@ -116,28 +116,37 @@ PayloadType: TypeAlias = str | bytes | bytearray | int | float | None
class MqttConsumer(BaseConsumer): class MqttConsumer(BaseConsumer):
client: mqtt.Client
initialized: List[str] initialized: List[str]
_client: mqtt.Client | None = None
def __init__(self, settings: Dict[str, Any]) -> None: def __init__(self, settings: Dict[str, Any]) -> None:
self.initialized = [] self.initialized = []
super().__init__(settings) super().__init__(settings)
self.client = mqtt.Client(client_id=settings["client"]["id"], userdata=self)
self.client.on_connect = self.on_connect @property
self.client.on_message = self.on_message def client(self) -> mqtt.Client:
self.client.on_disconnect = self.on_disconnect if self._client is not None:
self.client.on_connect_fail = self.on_connect_fail return self._client
self._client = mqtt.Client(
client_id=self.settings["client"]["id"], userdata=self
)
self._client.on_connect = self.on_connect
self._client.on_message = self.on_message
self._client.on_disconnect = self.on_disconnect
self._client.on_connect_fail = self.on_connect_fail
# Will must be set before connecting!! # Will must be set before connecting!!
self.client.will_set( self._client.will_set(
f"{self.topic_prefix}/available", payload="offline", retain=True f"{self.topic_prefix}/available", payload="offline", retain=True
) )
while True: while True:
try: try:
self.client.connect( self._client.connect(
settings["client"]["host"], self.settings["client"]["host"],
settings["client"]["port"], self.settings["client"]["port"],
settings["client"]["keepalive"], self.settings["client"]["keepalive"],
) )
break break
except OSError as err: except OSError as err:
@ -151,6 +160,7 @@ class MqttConsumer(BaseConsumer):
raise raise
print(err) print(err)
sleep(0.1) sleep(0.1)
return self._client
def config(self, settings: Dict[str, Any]): def config(self, settings: Dict[str, Any]):
super().config(settings) super().config(settings)
@ -167,9 +177,19 @@ class MqttConsumer(BaseConsumer):
settings.setdefault("discovery_prefix", "homeassistant") settings.setdefault("discovery_prefix", "homeassistant")
_controller_id: str | None = None
@property
def controller_id(self) -> str:
assert self.controller is not None
# Controller serial is fetched from device, cache it.
if self._controller_id is None:
self._controller_id = self.controller.serial
return f"{self.controller.manufacturer_id}_{self._controller_id}"
@property @property
def topic_prefix(self): def topic_prefix(self):
return f"{self.settings['prefix']}/{self.settings['device_id']}" return f"{self.settings['prefix']}/{self.controller_id}"
def get_ha_config( def get_ha_config(
self, self,
@ -181,21 +201,25 @@ class MqttConsumer(BaseConsumer):
state_class: Optional[str] = None, state_class: Optional[str] = None,
): ):
assert state_class in [None, "measurement", "total", "total_increasing"] assert state_class in [None, "measurement", "total", "total_increasing"]
assert self.controller is not None
res = { res = {
"~": f"{self.topic_prefix}", "~": f"{self.topic_prefix}",
"unique_id": f"{self.settings['device_id']}_{id}", "unique_id": f"{self.controller_id}_{id}",
"object_id": f"{self.controller_id}_{id}",
"availability_topic": "~/available", "availability_topic": "~/available",
"state_topic": f"~/{id}", "state_topic": f"~/{id}",
"name": name, "name": name,
"device": { "device": {
"identifiers": [ "identifiers": [
self.settings["device_id"], self.controller_id,
], ],
# TODO: Get charger serial and use for identifier instead "manufacturer": self.controller.manufacturer,
# See: https://www.home-assistant.io/integrations/sensor.mqtt/#device "model": self.controller.model,
# "via_device": self.settings["device_id"], "hw_version": self.controller.version,
"via_device": self.settings["device_id"],
"suggested_area": "Solar panel", "suggested_area": "Solar panel",
"name": self.controller.name,
}, },
"force_update": True, "force_update": True,
"expire_after": expiry, "expire_after": expiry,
@ -253,22 +277,25 @@ class MqttConsumer(BaseConsumer):
def write(self, data: Dict[str, PayloadType]): def write(self, data: Dict[str, PayloadType]):
self.client.publish(f"{self.topic_prefix}/raw", payload=json.dumps(data)) self.client.publish(f"{self.topic_prefix}/raw", payload=json.dumps(data))
for k, v in data.items(): for dataname, data_value in data.items():
if k in MAP_VALUES: if dataname in MAP_VALUES:
if k not in self.initialized: if dataname not in self.initialized:
km = MAP_VALUES[DataName(k)] km = MAP_VALUES[DataName(dataname)]
pretty_name = k.replace("_", " ").capitalize() pretty_name = dataname.replace("_", " ").capitalize()
disc_prefix = self.settings["discovery_prefix"] disc_prefix = self.settings["discovery_prefix"]
device_id = self.settings["device_id"]
self.client.publish( self.client.publish(
f"{disc_prefix}/sensor/{device_id}_{k}/config", f"{disc_prefix}/sensor/{self.controller_id}/{dataname}/config",
payload=json.dumps(self.get_ha_config(k, pretty_name, **km)), payload=json.dumps(
self.get_ha_config(dataname, pretty_name, **km)
),
retain=True, retain=True,
) )
self.initialized.append(k) self.initialized.append(dataname)
self.client.publish(f"{self.topic_prefix}/{k}", v, retain=True) self.client.publish(
f"{self.topic_prefix}/{dataname}", data_value, retain=True
)
def exit(self): def exit(self):
self.client.publish( self.client.publish(

View file

@ -8,7 +8,14 @@ from libscrc import modbus # type: ignore
from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER
from .interfaces import BaseInterface from .interfaces import BaseInterface
from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, ChargerState, DataItem from .solar_types import (
DATA_BATTERY_STATE,
HISTORICAL_DATA,
ChargerState,
DataItem,
HistoricalData,
HistoricalExtraInfo,
)
from .util import log from .util import log
@ -146,8 +153,8 @@ def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[byte
def writeMemory(fh: BaseInterface, address: int, data: bytes): def writeMemory(fh: BaseInterface, address: int, data: bytes):
if len(data) % 2: if len(data) != 2:
raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes") raise ValueError(f"Data must consist of a two-byte word, got {len(data)} bytes")
header = construct_write_request(address) header = construct_write_request(address)
write(fh, header + data) write(fh, header + data)
@ -159,7 +166,11 @@ def writeMemory(fh: BaseInterface, address: int, data: bytes):
header = fh.read(3) header = fh.read(3)
if header and len(header) == 3: if header and len(header) == 3:
operation, size, address = header operation, size, address = header
rdata = fh.read(size * 2) log(header)
# size field is zero when writing device name for whatever reason
# write command seems to only accept a single word, so this is fine;
# we just hardcode the number of bytes read to two here.
rdata = fh.read(2)
_crc = fh.read(2) _crc = fh.read(2)
if rdata and _crc: if rdata and _crc:
try: try:
@ -176,6 +187,19 @@ def writeMemory(fh: BaseInterface, address: int, data: bytes):
return None return None
def writeMemoryMultiple(fh: BaseInterface, address: int, data: bytes):
if len(data) % 2:
raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes")
res = bytearray()
for i in range(len(data) // 2):
d = data[i * 2 : (i + 1) * 2]
log(address + i, d)
r = writeMemory(fh, address + i, d)
if r:
res.extend(r)
return res
def try_read_parse( def try_read_parse(
dev: BaseInterface, dev: BaseInterface,
address: int, address: int,
@ -205,11 +229,19 @@ def try_read_parse(
class ChargeController: class ChargeController:
device: BaseInterface device: BaseInterface
manufacturer: str = "SRNE Solar Co., Ltd."
manufacturer_id: str = "srne"
def __init__(self, device: BaseInterface): def __init__(self, device: BaseInterface):
self.device = device self.device = device
_cached_serial: str | None = None
@property @property
def serial(self) -> str: def serial(self) -> str:
if self._cached_serial is not None:
return self._cached_serial
data = readMemory(self.device, 0x18, 3) data = readMemory(self.device, 0x18, 3)
if data is None: if data is None:
raise IOError # FIXME: Raise specific error in readMemory raise IOError # FIXME: Raise specific error in readMemory
@ -217,18 +249,31 @@ class ChargeController:
p1 = data[0] p1 = data[0]
p2 = data[1] p2 = data[1]
p3 = (data[2] << 8) + data[3] p3 = (data[2] << 8) + data[3]
return f"{p1}-{p2}-{p3}"
self._cached_serial = f"{p1}-{p2}-{p3}"
return self._cached_serial
_cached_model: str | None = None
@property @property
def model(self) -> str: def model(self) -> str:
if self._cached_model is not None:
return self._cached_model
data = readMemory(self.device, 0x0C, 8) data = readMemory(self.device, 0x0C, 8)
if data is None: if data is None:
raise IOError # FIXME: Raise specific error in readMemory raise IOError # FIXME: Raise specific error in readMemory
return data.decode("utf-8").strip() self._cached_model = data.decode("utf-8").strip()
return self._cached_model
_cached_version: str | None = None
@property @property
def version(self) -> str: def version(self) -> str:
if self._cached_version is not None:
return self._cached_version
data = readMemory(self.device, 0x14, 4) data = readMemory(self.device, 0x14, 4)
if data is None: if data is None:
raise IOError # FIXME: Raise specific error in readMemory raise IOError # FIXME: Raise specific error in readMemory
@ -237,7 +282,42 @@ class ChargeController:
minor = data[2] minor = data[2]
patch = data[3] patch = data[3]
return f"{major}.{minor}.{patch}" self._cached_version = f"{major}.{minor}.{patch}"
return self._cached_version
_cached_name: str | None = None
@property
def name(self) -> str:
if self._cached_name is not None:
return self._cached_name
data = readMemory(self.device, 0x0049, 16)
if data is None:
raise IOError
res = data.decode("UTF-16BE").strip()
return res
@name.setter
def name(self, value: str):
bin_value = bytearray(value.encode("UTF-16BE"))
if len(bin_value) > 32:
raise ValueError(
f"value must be no more than 32 bytes once encoded as UTF-16BE. {len(bin_value)} bytes supplied"
)
# Pad name to 32 bytes to ensure ensure nothing is left of old name
while len(bin_value) < 32:
bin_value.extend(b"\x00\x20")
print(len(bin_value), bin_value)
data = writeMemoryMultiple(self.device, 0x0049, bin_value)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
res = data.decode("UTF-16BE").strip()
if res != value:
log(f"setting device name failed; {res!r} != {value!r}")
self._cached_name = value
@property @property
def load_enabled(self) -> bool: def load_enabled(self) -> bool:
@ -259,23 +339,31 @@ class ChargeController:
@property @property
def state(self) -> ChargerState: def state(self) -> ChargerState:
raise NotImplementedError data = readMemory(self.device, 0x0100, 11)
""" if data is None:
data = try_read_parse(dev, 0x0100, 11, parse_battery_state) raise IOError # FIXME: Raise specific error in readMemory
if data:
data[DataName.CALCULATED_BATTERY_POWER] = float( return ChargerState(data)
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.BATTERY_CURRENT, 0))) def get_historical(self, day) -> HistoricalData:
) data = readMemory(self.device, 0xF000 + day, 10)
data[DataName.CALCULATED_PANEL_POWER] = float( if data is None:
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0))) raise IOError # FIXME: Raise specific error in readMemory
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
) return HistoricalData(data)
data[DataName.CALCULATED_LOAD_POWER] = float(
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0))) @property
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0))) def today(self) -> HistoricalData:
) data = readMemory(self.device, 0x010B, 10)
log(data) if data is None:
for consumer in consumers: raise IOError # FIXME: Raise specific error in readMemory
consumer.write(data)
""" return HistoricalData(data)
@property
def extra(self) -> HistoricalExtraInfo:
data = readMemory(self.device, 0x0115, 11)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return HistoricalExtraInfo(data)

View file

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import struct import struct
from abc import ABC, abstractmethod
from enum import Enum, unique from enum import Enum, unique
from typing import Callable, Optional from typing import Any, Callable, Dict, Optional
@unique @unique
@ -21,7 +22,7 @@ class DataName(str, Enum):
BATTERY_VOLTAGE_MIN = "battery_voltage_min" BATTERY_VOLTAGE_MIN = "battery_voltage_min"
BATTERY_VOLTAGE_MAX = "battery_voltage_max" BATTERY_VOLTAGE_MAX = "battery_voltage_max"
CHARGE_MAX_CURRENT = "charge_max_current" CHARGE_MAX_CURRENT = "charge_max_current"
_DISCHARGE_MAX_CURRENT = "_discharge_max_current?" DISCHARGE_MAX_CURRENT = "discharge_max_current"
CHARGE_MAX_POWER = "charge_max_power" CHARGE_MAX_POWER = "charge_max_power"
DISCHARGE_MAX_POWER = "discharge_max_power" DISCHARGE_MAX_POWER = "discharge_max_power"
CHARGE_AMP_HOUR = "charge_amp_hour" CHARGE_AMP_HOUR = "charge_amp_hour"
@ -104,13 +105,14 @@ HISTORICAL_DATA = [
DataItem(DataName.BATTERY_VOLTAGE_MIN, "H", "V", lambda n: n / 10), DataItem(DataName.BATTERY_VOLTAGE_MIN, "H", "V", lambda n: n / 10),
DataItem(DataName.BATTERY_VOLTAGE_MAX, "H", "V", lambda n: n / 10), DataItem(DataName.BATTERY_VOLTAGE_MAX, "H", "V", lambda n: n / 10),
DataItem(DataName.CHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100), DataItem(DataName.CHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName._DISCHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100), DataItem(DataName.DISCHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.CHARGE_MAX_POWER, "H", "W"), DataItem(DataName.CHARGE_MAX_POWER, "H", "W"),
DataItem(DataName.DISCHARGE_MAX_POWER, "H", "W"), DataItem(DataName.DISCHARGE_MAX_POWER, "H", "W"),
DataItem(DataName.CHARGE_AMP_HOUR, "H", "Ah"), DataItem(DataName.CHARGE_AMP_HOUR, "H", "Ah"),
DataItem(DataName.DISCHARGE_AMP_HOUR, "H", "Ah"), DataItem(DataName.DISCHARGE_AMP_HOUR, "H", "Ah"),
DataItem(DataName.PRODUCTION_ENERGY, "H", "Wh"), DataItem(DataName.PRODUCTION_ENERGY, "H", "Wh"),
DataItem(DataName.CONSUMPTION_ENERGY, "H", "Wh"), DataItem(DataName.CONSUMPTION_ENERGY, "H", "Wh"),
#
DataItem(DataName.RUN_DAYS, "H"), DataItem(DataName.RUN_DAYS, "H"),
DataItem(DataName.DISCHARGE_COUNT, "H"), DataItem(DataName.DISCHARGE_COUNT, "H"),
DataItem(DataName.FULL_CHARGE_COUNT, "H"), DataItem(DataName.FULL_CHARGE_COUNT, "H"),
@ -121,6 +123,178 @@ HISTORICAL_DATA = [
] ]
class ChargerState: class DecodedData(ABC):
@abstractmethod
def __init__(self, data: bytes | bytearray | memoryview) -> None: def __init__(self, data: bytes | bytearray | memoryview) -> None:
raise NotImplementedError ...
@abstractmethod
def as_dict(self) -> Dict[DataName, Any]:
...
class ChargerState(DecodedData):
battery_charge: int
battery_voltage: float
battery_current: float
internal_temperature: int
battery_temperature: int
load_voltage: float
load_current: float
load_power: float
panel_voltage: float
panel_current: float
panel_power: float
load_enabled: bool
def __init__(self, data: bytes | bytearray | memoryview) -> None:
(
_battery_charge,
_battery_voltage,
_battery_current,
_internal_temperature,
_battery_temperature,
_load_voltage,
_load_current,
_load_power,
_panel_voltage,
_panel_current,
_panel_power,
_load_enabled,
) = struct.unpack("!HHHBBHHHHHHx?", data)
self.battery_charge = _battery_charge
self.battery_voltage = _battery_voltage / 10
self.battery_current = _battery_current / 100
self.internal_temperature = parse_temperature(_internal_temperature)
self.battery_temperature = parse_temperature(_battery_temperature)
self.load_voltage = _load_voltage / 10
self.load_current = _load_current / 100
self.load_power = _load_power
self.panel_voltage = _panel_voltage / 10
self.panel_current = _panel_current / 100
self.panel_power = _panel_power
self.load_enabled = bool(_load_enabled)
@property
def calculated_battery_power(self) -> float:
return self.battery_voltage * self.battery_current
@property
def calculated_panel_power(self) -> float:
return self.panel_voltage * self.panel_current
@property
def calculated_load_power(self) -> float:
return self.load_voltage * self.load_current
def as_dict(self):
return {
DataName.BATTERY_CHARGE: self.battery_charge,
DataName.BATTERY_VOLTAGE: self.battery_voltage,
DataName.BATTERY_CURRENT: self.battery_current,
DataName.INTERNAL_TEMPERATURE: self.internal_temperature,
DataName.BATTERY_TEMPERATURE: self.battery_temperature,
DataName.LOAD_VOLTAGE: self.load_voltage,
DataName.LOAD_CURRENT: self.load_current,
DataName.LOAD_POWER: self.load_power,
DataName.PANEL_VOLTAGE: self.panel_voltage,
DataName.PANEL_CURRENT: self.panel_current,
DataName.PANEL_POWER: self.panel_power,
DataName.LOAD_ENABLED: self.load_enabled,
DataName.CALCULATED_BATTERY_POWER: self.calculated_battery_power,
DataName.CALCULATED_PANEL_POWER: self.calculated_panel_power,
DataName.CALCULATED_LOAD_POWER: self.calculated_load_power,
}
class HistoricalData(DecodedData):
battery_voltage_min: float
battery_voltage_max: float
charge_max_current: float
discharge_max_current: float
charge_max_power: int
discharge_max_power: int
charge_amp_hour: int
discharge_amp_hour: int
production_energy: int
consumption_energy: int
def __init__(self, data: bytes | bytearray | memoryview) -> None:
(
_battery_voltage_min,
_battery_voltage_max,
_charge_max_current,
_discharge_max_current,
_charge_max_power,
_discharge_max_power,
_charge_amp_hour,
_discharge_amp_hour,
_production_energy,
_consumption_energy,
) = struct.unpack("!HHHHHHHHHH", data)
self.battery_voltage_min = _battery_voltage_min / 10
self.battery_voltage_max = _battery_voltage_max / 10
self.charge_max_current = _charge_max_current / 100
self.discharge_max_current = _discharge_max_current / 100
self.charge_max_power = _charge_max_power
self.discharge_max_power = _discharge_max_power
self.charge_amp_hour = _charge_amp_hour
self.discharge_amp_hour = _discharge_amp_hour
self.production_energy = _production_energy
self.consumption_energy = _consumption_energy
def as_dict(self):
return {
DataName.BATTERY_VOLTAGE_MIN: self.battery_voltage_min,
DataName.BATTERY_VOLTAGE_MAX: self.battery_voltage_max,
DataName.CHARGE_MAX_CURRENT: self.charge_max_current,
DataName.DISCHARGE_MAX_CURRENT: self.discharge_max_current,
DataName.CHARGE_MAX_POWER: self.charge_max_power,
DataName.DISCHARGE_MAX_POWER: self.discharge_max_power,
DataName.CHARGE_AMP_HOUR: self.charge_amp_hour,
DataName.DISCHARGE_AMP_HOUR: self.discharge_amp_hour,
DataName.PRODUCTION_ENERGY: self.production_energy,
DataName.CONSUMPTION_ENERGY: self.consumption_energy,
}
class HistoricalExtraInfo(DecodedData):
run_days: int
discharge_count: int
full_charge_count: int
total_charge_amp_hours: int
total_discharge_amp_hours: int
total_production_energy: int
total_consumption_energy: int
def __init__(self, data: bytes | bytearray | memoryview) -> None:
(
_run_days,
_discharge_count,
_full_charge_count,
_total_charge_amp_hours,
_total_discharge_amp_hours,
_total_production_energy,
_total_consumption_energy,
) = struct.unpack("!HHHLLLL", data)
self.run_days = _run_days
self.discharge_count = _discharge_count
self.full_charge_count = _full_charge_count
self.total_charge_amp_hours = _total_charge_amp_hours
self.total_discharge_amp_hours = _total_discharge_amp_hours
self.total_production_energy = _total_production_energy
self.total_consumption_energy = _total_consumption_energy
def as_dict(self):
return {
DataName.RUN_DAYS: self.run_days,
DataName.DISCHARGE_COUNT: self.discharge_count,
DataName.FULL_CHARGE_COUNT: self.full_charge_count,
DataName.TOTAL_CHARGE_AMP_HOURS: self.total_charge_amp_hours,
DataName.TOTAL_DISCHARGE_AMP_HOURS: self.total_discharge_amp_hours,
DataName.TOTAL_PRODUCTION_ENERGY: self.total_production_energy,
DataName.TOTAL_CONSUMPTION_ENERGY: self.total_consumption_energy,
}