Compare commits
7 commits
4bb77c3bb3
...
67a25eeef9
Author | SHA1 | Date | |
---|---|---|---|
67a25eeef9 | |||
6c0f1c3d13 | |||
4dc42ee6f5 | |||
3aa6b13615 | |||
71919fc406 | |||
fe9c6a82ff | |||
f0c2057428 |
7 changed files with 389 additions and 107 deletions
|
@ -20,7 +20,7 @@ HISTORICAL_KEYS = {
|
|||
DataName.BATTERY_VOLTAGE_MIN,
|
||||
DataName.BATTERY_VOLTAGE_MAX,
|
||||
DataName.CHARGE_MAX_CURRENT,
|
||||
DataName._DISCHARGE_MAX_CURRENT,
|
||||
DataName.DISCHARGE_MAX_CURRENT,
|
||||
DataName.CHARGE_MAX_POWER,
|
||||
DataName.DISCHARGE_MAX_POWER,
|
||||
DataName.CHARGE_AMP_HOUR,
|
||||
|
@ -58,7 +58,7 @@ KNOWN_KEYS = HISTORICAL_KEYS.union(INSTANT_KEYS)
|
|||
MAP = {
|
||||
"_internal_temperature?": "internal_temp",
|
||||
"unknown1": "charge_max_current",
|
||||
"unknown2": "_discharge_max_current?",
|
||||
"unknown2": "discharge_max_current",
|
||||
"internal_temperature": "internal_temp",
|
||||
"battery_temperature": "battery_temp",
|
||||
}
|
||||
|
|
|
@ -20,3 +20,7 @@ if __name__ == "__main__":
|
|||
sleep(5)
|
||||
cc.load_enabled = False
|
||||
print(f"Load enabled: {cc.load_enabled}")
|
||||
|
||||
# print(f"Name: {cc.name}")
|
||||
# cc.name = "☀️ 🔌🔋Charger"
|
||||
# print(f"Name: {cc.name}")
|
||||
|
|
|
@ -2,15 +2,12 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import time
|
||||
from decimal import Decimal
|
||||
from typing import cast
|
||||
|
||||
from bluepy.btle import BTLEDisconnectError # type: ignore
|
||||
from serial import SerialException # type: ignore
|
||||
|
||||
from .config import get_config, get_consumers, get_interface
|
||||
from .protocol import parse_battery_state, parse_historical_entry, try_read_parse
|
||||
from .solar_types import DataName
|
||||
from .protocol import ChargeController
|
||||
from .util import Periodical, log
|
||||
|
||||
|
||||
|
@ -35,25 +32,31 @@ def main():
|
|||
with get_interface() as dev:
|
||||
log("Connected.")
|
||||
|
||||
cc = ChargeController(dev)
|
||||
log(f"Controller model: {cc.model}")
|
||||
log(f"Controller version: {cc.version}")
|
||||
log(f"Controller serial: {cc.serial}")
|
||||
for consumer in consumers:
|
||||
consumer.controller = cc
|
||||
|
||||
# write(dev, construct_request(0, 32))
|
||||
|
||||
# Memory dump
|
||||
# for address in range(0, 0x10000, 16):
|
||||
# log(f"Reading 0x{address:04X}...")
|
||||
# write(wd, construct_request(address, 16))
|
||||
days = 7
|
||||
res = try_read_parse(dev, 0x010B, 21, parse_historical_entry)
|
||||
if res:
|
||||
log(res)
|
||||
extra = cc.extra
|
||||
days = extra.run_days
|
||||
|
||||
res = cc.today.as_dict()
|
||||
res.update(extra.as_dict())
|
||||
for consumer in consumers:
|
||||
consumer.write(res)
|
||||
days = cast(int, res.get("run_days", 7))
|
||||
del extra
|
||||
|
||||
for i in range(days):
|
||||
res = try_read_parse(
|
||||
dev, 0xF000 + i, 10, parse_historical_entry
|
||||
)
|
||||
if res:
|
||||
hist = cc.get_historical(i)
|
||||
res = hist.as_dict()
|
||||
log({i: res})
|
||||
for consumer in consumers:
|
||||
consumer.write({str(i): res})
|
||||
|
@ -62,31 +65,14 @@ def main():
|
|||
now = time.time()
|
||||
|
||||
if per_voltages(now):
|
||||
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
|
||||
if data:
|
||||
data[DataName.CALCULATED_BATTERY_POWER] = float(
|
||||
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
|
||||
* Decimal(
|
||||
str(data.get(DataName.BATTERY_CURRENT, 0))
|
||||
)
|
||||
)
|
||||
data[DataName.CALCULATED_PANEL_POWER] = float(
|
||||
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
|
||||
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
|
||||
)
|
||||
data[DataName.CALCULATED_LOAD_POWER] = float(
|
||||
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
|
||||
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
|
||||
)
|
||||
data = cc.state.as_dict()
|
||||
log(data)
|
||||
for consumer in consumers:
|
||||
consumer.write(data)
|
||||
|
||||
if per_current_hist(now):
|
||||
data = try_read_parse(
|
||||
dev, 0x010B, 21, parse_historical_entry
|
||||
)
|
||||
if data:
|
||||
data = cc.today.as_dict()
|
||||
data.update(cc.extra.as_dict())
|
||||
log(data)
|
||||
for consumer in consumers:
|
||||
consumer.write(data)
|
||||
|
@ -95,7 +81,7 @@ def main():
|
|||
for consumer in consumers:
|
||||
consumer.poll()
|
||||
|
||||
time.sleep(max(0, 1 - time.time() - now))
|
||||
time.sleep(max(0, 1 - (time.time() - now)))
|
||||
|
||||
# if STATUS.get('load_enabled'):
|
||||
# write(wd, CMD_DISABLE_LOAD)
|
||||
|
|
|
@ -2,9 +2,12 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict
|
||||
|
||||
from ..protocol import ChargeController
|
||||
|
||||
|
||||
class BaseConsumer(ABC):
|
||||
settings: Dict[str, Any]
|
||||
controller: ChargeController | None = None
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, settings: Dict[str, Any]) -> None:
|
||||
|
|
|
@ -13,7 +13,7 @@ MAP_VALUES: Dict[DataName, Dict[str, Any]] = {
|
|||
# DataName.BATTERY_VOLTAGE_MIN: {},
|
||||
# DataName.BATTERY_VOLTAGE_MAX: {},
|
||||
# DataName.CHARGE_MAX_CURRENT: {},
|
||||
# DataName._DISCHARGE_MAX_CURRENT: {},
|
||||
# DataName.DISCHARGE_MAX_CURRENT: {},
|
||||
# DataName.CHARGE_MAX_POWER: {},
|
||||
# DataName.DISCHARGE_MAX_POWER: {},
|
||||
# DataName.CHARGE_AMP_HOUR: {},
|
||||
|
@ -116,28 +116,37 @@ PayloadType: TypeAlias = str | bytes | bytearray | int | float | None
|
|||
|
||||
|
||||
class MqttConsumer(BaseConsumer):
|
||||
client: mqtt.Client
|
||||
initialized: List[str]
|
||||
|
||||
_client: mqtt.Client | None = None
|
||||
|
||||
def __init__(self, settings: Dict[str, Any]) -> None:
|
||||
self.initialized = []
|
||||
|
||||
super().__init__(settings)
|
||||
self.client = mqtt.Client(client_id=settings["client"]["id"], userdata=self)
|
||||
self.client.on_connect = self.on_connect
|
||||
self.client.on_message = self.on_message
|
||||
self.client.on_disconnect = self.on_disconnect
|
||||
self.client.on_connect_fail = self.on_connect_fail
|
||||
|
||||
@property
|
||||
def client(self) -> mqtt.Client:
|
||||
if self._client is not None:
|
||||
return self._client
|
||||
|
||||
self._client = mqtt.Client(
|
||||
client_id=self.settings["client"]["id"], userdata=self
|
||||
)
|
||||
self._client.on_connect = self.on_connect
|
||||
self._client.on_message = self.on_message
|
||||
self._client.on_disconnect = self.on_disconnect
|
||||
self._client.on_connect_fail = self.on_connect_fail
|
||||
# Will must be set before connecting!!
|
||||
self.client.will_set(
|
||||
self._client.will_set(
|
||||
f"{self.topic_prefix}/available", payload="offline", retain=True
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
self.client.connect(
|
||||
settings["client"]["host"],
|
||||
settings["client"]["port"],
|
||||
settings["client"]["keepalive"],
|
||||
self._client.connect(
|
||||
self.settings["client"]["host"],
|
||||
self.settings["client"]["port"],
|
||||
self.settings["client"]["keepalive"],
|
||||
)
|
||||
break
|
||||
except OSError as err:
|
||||
|
@ -151,6 +160,7 @@ class MqttConsumer(BaseConsumer):
|
|||
raise
|
||||
print(err)
|
||||
sleep(0.1)
|
||||
return self._client
|
||||
|
||||
def config(self, settings: Dict[str, Any]):
|
||||
super().config(settings)
|
||||
|
@ -167,9 +177,19 @@ class MqttConsumer(BaseConsumer):
|
|||
|
||||
settings.setdefault("discovery_prefix", "homeassistant")
|
||||
|
||||
_controller_id: str | None = None
|
||||
|
||||
@property
|
||||
def controller_id(self) -> str:
|
||||
assert self.controller is not None
|
||||
# Controller serial is fetched from device, cache it.
|
||||
if self._controller_id is None:
|
||||
self._controller_id = self.controller.serial
|
||||
return f"{self.controller.manufacturer_id}_{self._controller_id}"
|
||||
|
||||
@property
|
||||
def topic_prefix(self):
|
||||
return f"{self.settings['prefix']}/{self.settings['device_id']}"
|
||||
return f"{self.settings['prefix']}/{self.controller_id}"
|
||||
|
||||
def get_ha_config(
|
||||
self,
|
||||
|
@ -181,21 +201,25 @@ class MqttConsumer(BaseConsumer):
|
|||
state_class: Optional[str] = None,
|
||||
):
|
||||
assert state_class in [None, "measurement", "total", "total_increasing"]
|
||||
assert self.controller is not None
|
||||
|
||||
res = {
|
||||
"~": f"{self.topic_prefix}",
|
||||
"unique_id": f"{self.settings['device_id']}_{id}",
|
||||
"unique_id": f"{self.controller_id}_{id}",
|
||||
"object_id": f"{self.controller_id}_{id}",
|
||||
"availability_topic": "~/available",
|
||||
"state_topic": f"~/{id}",
|
||||
"name": name,
|
||||
"device": {
|
||||
"identifiers": [
|
||||
self.settings["device_id"],
|
||||
self.controller_id,
|
||||
],
|
||||
# TODO: Get charger serial and use for identifier instead
|
||||
# See: https://www.home-assistant.io/integrations/sensor.mqtt/#device
|
||||
# "via_device": self.settings["device_id"],
|
||||
"manufacturer": self.controller.manufacturer,
|
||||
"model": self.controller.model,
|
||||
"hw_version": self.controller.version,
|
||||
"via_device": self.settings["device_id"],
|
||||
"suggested_area": "Solar panel",
|
||||
"name": self.controller.name,
|
||||
},
|
||||
"force_update": True,
|
||||
"expire_after": expiry,
|
||||
|
@ -253,22 +277,25 @@ class MqttConsumer(BaseConsumer):
|
|||
def write(self, data: Dict[str, PayloadType]):
|
||||
self.client.publish(f"{self.topic_prefix}/raw", payload=json.dumps(data))
|
||||
|
||||
for k, v in data.items():
|
||||
if k in MAP_VALUES:
|
||||
if k not in self.initialized:
|
||||
km = MAP_VALUES[DataName(k)]
|
||||
pretty_name = k.replace("_", " ").capitalize()
|
||||
for dataname, data_value in data.items():
|
||||
if dataname in MAP_VALUES:
|
||||
if dataname not in self.initialized:
|
||||
km = MAP_VALUES[DataName(dataname)]
|
||||
pretty_name = dataname.replace("_", " ").capitalize()
|
||||
disc_prefix = self.settings["discovery_prefix"]
|
||||
device_id = self.settings["device_id"]
|
||||
|
||||
self.client.publish(
|
||||
f"{disc_prefix}/sensor/{device_id}_{k}/config",
|
||||
payload=json.dumps(self.get_ha_config(k, pretty_name, **km)),
|
||||
f"{disc_prefix}/sensor/{self.controller_id}/{dataname}/config",
|
||||
payload=json.dumps(
|
||||
self.get_ha_config(dataname, pretty_name, **km)
|
||||
),
|
||||
retain=True,
|
||||
)
|
||||
self.initialized.append(k)
|
||||
self.initialized.append(dataname)
|
||||
|
||||
self.client.publish(f"{self.topic_prefix}/{k}", v, retain=True)
|
||||
self.client.publish(
|
||||
f"{self.topic_prefix}/{dataname}", data_value, retain=True
|
||||
)
|
||||
|
||||
def exit(self):
|
||||
self.client.publish(
|
||||
|
|
|
@ -8,7 +8,14 @@ from libscrc import modbus # type: ignore
|
|||
|
||||
from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER
|
||||
from .interfaces import BaseInterface
|
||||
from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, ChargerState, DataItem
|
||||
from .solar_types import (
|
||||
DATA_BATTERY_STATE,
|
||||
HISTORICAL_DATA,
|
||||
ChargerState,
|
||||
DataItem,
|
||||
HistoricalData,
|
||||
HistoricalExtraInfo,
|
||||
)
|
||||
from .util import log
|
||||
|
||||
|
||||
|
@ -146,8 +153,8 @@ def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[byte
|
|||
|
||||
|
||||
def writeMemory(fh: BaseInterface, address: int, data: bytes):
|
||||
if len(data) % 2:
|
||||
raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes")
|
||||
if len(data) != 2:
|
||||
raise ValueError(f"Data must consist of a two-byte word, got {len(data)} bytes")
|
||||
|
||||
header = construct_write_request(address)
|
||||
write(fh, header + data)
|
||||
|
@ -159,7 +166,11 @@ def writeMemory(fh: BaseInterface, address: int, data: bytes):
|
|||
header = fh.read(3)
|
||||
if header and len(header) == 3:
|
||||
operation, size, address = header
|
||||
rdata = fh.read(size * 2)
|
||||
log(header)
|
||||
# size field is zero when writing device name for whatever reason
|
||||
# write command seems to only accept a single word, so this is fine;
|
||||
# we just hardcode the number of bytes read to two here.
|
||||
rdata = fh.read(2)
|
||||
_crc = fh.read(2)
|
||||
if rdata and _crc:
|
||||
try:
|
||||
|
@ -176,6 +187,19 @@ def writeMemory(fh: BaseInterface, address: int, data: bytes):
|
|||
return None
|
||||
|
||||
|
||||
def writeMemoryMultiple(fh: BaseInterface, address: int, data: bytes):
|
||||
if len(data) % 2:
|
||||
raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes")
|
||||
res = bytearray()
|
||||
for i in range(len(data) // 2):
|
||||
d = data[i * 2 : (i + 1) * 2]
|
||||
log(address + i, d)
|
||||
r = writeMemory(fh, address + i, d)
|
||||
if r:
|
||||
res.extend(r)
|
||||
return res
|
||||
|
||||
|
||||
def try_read_parse(
|
||||
dev: BaseInterface,
|
||||
address: int,
|
||||
|
@ -205,11 +229,19 @@ def try_read_parse(
|
|||
class ChargeController:
|
||||
device: BaseInterface
|
||||
|
||||
manufacturer: str = "SRNE Solar Co., Ltd."
|
||||
manufacturer_id: str = "srne"
|
||||
|
||||
def __init__(self, device: BaseInterface):
|
||||
self.device = device
|
||||
|
||||
_cached_serial: str | None = None
|
||||
|
||||
@property
|
||||
def serial(self) -> str:
|
||||
if self._cached_serial is not None:
|
||||
return self._cached_serial
|
||||
|
||||
data = readMemory(self.device, 0x18, 3)
|
||||
if data is None:
|
||||
raise IOError # FIXME: Raise specific error in readMemory
|
||||
|
@ -217,18 +249,31 @@ class ChargeController:
|
|||
p1 = data[0]
|
||||
p2 = data[1]
|
||||
p3 = (data[2] << 8) + data[3]
|
||||
return f"{p1}-{p2}-{p3}"
|
||||
|
||||
self._cached_serial = f"{p1}-{p2}-{p3}"
|
||||
return self._cached_serial
|
||||
|
||||
_cached_model: str | None = None
|
||||
|
||||
@property
|
||||
def model(self) -> str:
|
||||
if self._cached_model is not None:
|
||||
return self._cached_model
|
||||
|
||||
data = readMemory(self.device, 0x0C, 8)
|
||||
if data is None:
|
||||
raise IOError # FIXME: Raise specific error in readMemory
|
||||
|
||||
return data.decode("utf-8").strip()
|
||||
self._cached_model = data.decode("utf-8").strip()
|
||||
return self._cached_model
|
||||
|
||||
_cached_version: str | None = None
|
||||
|
||||
@property
|
||||
def version(self) -> str:
|
||||
if self._cached_version is not None:
|
||||
return self._cached_version
|
||||
|
||||
data = readMemory(self.device, 0x14, 4)
|
||||
if data is None:
|
||||
raise IOError # FIXME: Raise specific error in readMemory
|
||||
|
@ -237,7 +282,42 @@ class ChargeController:
|
|||
minor = data[2]
|
||||
patch = data[3]
|
||||
|
||||
return f"{major}.{minor}.{patch}"
|
||||
self._cached_version = f"{major}.{minor}.{patch}"
|
||||
return self._cached_version
|
||||
|
||||
_cached_name: str | None = None
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
if self._cached_name is not None:
|
||||
return self._cached_name
|
||||
data = readMemory(self.device, 0x0049, 16)
|
||||
if data is None:
|
||||
raise IOError
|
||||
res = data.decode("UTF-16BE").strip()
|
||||
return res
|
||||
|
||||
@name.setter
|
||||
def name(self, value: str):
|
||||
bin_value = bytearray(value.encode("UTF-16BE"))
|
||||
if len(bin_value) > 32:
|
||||
raise ValueError(
|
||||
f"value must be no more than 32 bytes once encoded as UTF-16BE. {len(bin_value)} bytes supplied"
|
||||
)
|
||||
|
||||
# Pad name to 32 bytes to ensure ensure nothing is left of old name
|
||||
while len(bin_value) < 32:
|
||||
bin_value.extend(b"\x00\x20")
|
||||
print(len(bin_value), bin_value)
|
||||
|
||||
data = writeMemoryMultiple(self.device, 0x0049, bin_value)
|
||||
if data is None:
|
||||
raise IOError # FIXME: Raise specific error in readMemory
|
||||
|
||||
res = data.decode("UTF-16BE").strip()
|
||||
if res != value:
|
||||
log(f"setting device name failed; {res!r} != {value!r}")
|
||||
self._cached_name = value
|
||||
|
||||
@property
|
||||
def load_enabled(self) -> bool:
|
||||
|
@ -259,23 +339,31 @@ class ChargeController:
|
|||
|
||||
@property
|
||||
def state(self) -> ChargerState:
|
||||
raise NotImplementedError
|
||||
"""
|
||||
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
|
||||
if data:
|
||||
data[DataName.CALCULATED_BATTERY_POWER] = float(
|
||||
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
|
||||
* Decimal(str(data.get(DataName.BATTERY_CURRENT, 0)))
|
||||
)
|
||||
data[DataName.CALCULATED_PANEL_POWER] = float(
|
||||
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
|
||||
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
|
||||
)
|
||||
data[DataName.CALCULATED_LOAD_POWER] = float(
|
||||
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
|
||||
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
|
||||
)
|
||||
log(data)
|
||||
for consumer in consumers:
|
||||
consumer.write(data)
|
||||
"""
|
||||
data = readMemory(self.device, 0x0100, 11)
|
||||
if data is None:
|
||||
raise IOError # FIXME: Raise specific error in readMemory
|
||||
|
||||
return ChargerState(data)
|
||||
|
||||
def get_historical(self, day) -> HistoricalData:
|
||||
data = readMemory(self.device, 0xF000 + day, 10)
|
||||
if data is None:
|
||||
raise IOError # FIXME: Raise specific error in readMemory
|
||||
|
||||
return HistoricalData(data)
|
||||
|
||||
@property
|
||||
def today(self) -> HistoricalData:
|
||||
data = readMemory(self.device, 0x010B, 10)
|
||||
if data is None:
|
||||
raise IOError # FIXME: Raise specific error in readMemory
|
||||
|
||||
return HistoricalData(data)
|
||||
|
||||
@property
|
||||
def extra(self) -> HistoricalExtraInfo:
|
||||
data = readMemory(self.device, 0x0115, 11)
|
||||
if data is None:
|
||||
raise IOError # FIXME: Raise specific error in readMemory
|
||||
|
||||
return HistoricalExtraInfo(data)
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import struct
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum, unique
|
||||
from typing import Callable, Optional
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
|
||||
@unique
|
||||
|
@ -21,7 +22,7 @@ class DataName(str, Enum):
|
|||
BATTERY_VOLTAGE_MIN = "battery_voltage_min"
|
||||
BATTERY_VOLTAGE_MAX = "battery_voltage_max"
|
||||
CHARGE_MAX_CURRENT = "charge_max_current"
|
||||
_DISCHARGE_MAX_CURRENT = "_discharge_max_current?"
|
||||
DISCHARGE_MAX_CURRENT = "discharge_max_current"
|
||||
CHARGE_MAX_POWER = "charge_max_power"
|
||||
DISCHARGE_MAX_POWER = "discharge_max_power"
|
||||
CHARGE_AMP_HOUR = "charge_amp_hour"
|
||||
|
@ -104,13 +105,14 @@ HISTORICAL_DATA = [
|
|||
DataItem(DataName.BATTERY_VOLTAGE_MIN, "H", "V", lambda n: n / 10),
|
||||
DataItem(DataName.BATTERY_VOLTAGE_MAX, "H", "V", lambda n: n / 10),
|
||||
DataItem(DataName.CHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
|
||||
DataItem(DataName._DISCHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
|
||||
DataItem(DataName.DISCHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
|
||||
DataItem(DataName.CHARGE_MAX_POWER, "H", "W"),
|
||||
DataItem(DataName.DISCHARGE_MAX_POWER, "H", "W"),
|
||||
DataItem(DataName.CHARGE_AMP_HOUR, "H", "Ah"),
|
||||
DataItem(DataName.DISCHARGE_AMP_HOUR, "H", "Ah"),
|
||||
DataItem(DataName.PRODUCTION_ENERGY, "H", "Wh"),
|
||||
DataItem(DataName.CONSUMPTION_ENERGY, "H", "Wh"),
|
||||
#
|
||||
DataItem(DataName.RUN_DAYS, "H"),
|
||||
DataItem(DataName.DISCHARGE_COUNT, "H"),
|
||||
DataItem(DataName.FULL_CHARGE_COUNT, "H"),
|
||||
|
@ -121,6 +123,178 @@ HISTORICAL_DATA = [
|
|||
]
|
||||
|
||||
|
||||
class ChargerState:
|
||||
class DecodedData(ABC):
|
||||
@abstractmethod
|
||||
def __init__(self, data: bytes | bytearray | memoryview) -> None:
|
||||
raise NotImplementedError
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def as_dict(self) -> Dict[DataName, Any]:
|
||||
...
|
||||
|
||||
|
||||
class ChargerState(DecodedData):
|
||||
battery_charge: int
|
||||
battery_voltage: float
|
||||
battery_current: float
|
||||
internal_temperature: int
|
||||
battery_temperature: int
|
||||
load_voltage: float
|
||||
load_current: float
|
||||
load_power: float
|
||||
panel_voltage: float
|
||||
panel_current: float
|
||||
panel_power: float
|
||||
load_enabled: bool
|
||||
|
||||
def __init__(self, data: bytes | bytearray | memoryview) -> None:
|
||||
(
|
||||
_battery_charge,
|
||||
_battery_voltage,
|
||||
_battery_current,
|
||||
_internal_temperature,
|
||||
_battery_temperature,
|
||||
_load_voltage,
|
||||
_load_current,
|
||||
_load_power,
|
||||
_panel_voltage,
|
||||
_panel_current,
|
||||
_panel_power,
|
||||
_load_enabled,
|
||||
) = struct.unpack("!HHHBBHHHHHHx?", data)
|
||||
|
||||
self.battery_charge = _battery_charge
|
||||
self.battery_voltage = _battery_voltage / 10
|
||||
self.battery_current = _battery_current / 100
|
||||
self.internal_temperature = parse_temperature(_internal_temperature)
|
||||
self.battery_temperature = parse_temperature(_battery_temperature)
|
||||
self.load_voltage = _load_voltage / 10
|
||||
self.load_current = _load_current / 100
|
||||
self.load_power = _load_power
|
||||
self.panel_voltage = _panel_voltage / 10
|
||||
self.panel_current = _panel_current / 100
|
||||
self.panel_power = _panel_power
|
||||
self.load_enabled = bool(_load_enabled)
|
||||
|
||||
@property
|
||||
def calculated_battery_power(self) -> float:
|
||||
return self.battery_voltage * self.battery_current
|
||||
|
||||
@property
|
||||
def calculated_panel_power(self) -> float:
|
||||
return self.panel_voltage * self.panel_current
|
||||
|
||||
@property
|
||||
def calculated_load_power(self) -> float:
|
||||
return self.load_voltage * self.load_current
|
||||
|
||||
def as_dict(self):
|
||||
return {
|
||||
DataName.BATTERY_CHARGE: self.battery_charge,
|
||||
DataName.BATTERY_VOLTAGE: self.battery_voltage,
|
||||
DataName.BATTERY_CURRENT: self.battery_current,
|
||||
DataName.INTERNAL_TEMPERATURE: self.internal_temperature,
|
||||
DataName.BATTERY_TEMPERATURE: self.battery_temperature,
|
||||
DataName.LOAD_VOLTAGE: self.load_voltage,
|
||||
DataName.LOAD_CURRENT: self.load_current,
|
||||
DataName.LOAD_POWER: self.load_power,
|
||||
DataName.PANEL_VOLTAGE: self.panel_voltage,
|
||||
DataName.PANEL_CURRENT: self.panel_current,
|
||||
DataName.PANEL_POWER: self.panel_power,
|
||||
DataName.LOAD_ENABLED: self.load_enabled,
|
||||
DataName.CALCULATED_BATTERY_POWER: self.calculated_battery_power,
|
||||
DataName.CALCULATED_PANEL_POWER: self.calculated_panel_power,
|
||||
DataName.CALCULATED_LOAD_POWER: self.calculated_load_power,
|
||||
}
|
||||
|
||||
|
||||
class HistoricalData(DecodedData):
|
||||
battery_voltage_min: float
|
||||
battery_voltage_max: float
|
||||
charge_max_current: float
|
||||
discharge_max_current: float
|
||||
charge_max_power: int
|
||||
discharge_max_power: int
|
||||
charge_amp_hour: int
|
||||
discharge_amp_hour: int
|
||||
production_energy: int
|
||||
consumption_energy: int
|
||||
|
||||
def __init__(self, data: bytes | bytearray | memoryview) -> None:
|
||||
(
|
||||
_battery_voltage_min,
|
||||
_battery_voltage_max,
|
||||
_charge_max_current,
|
||||
_discharge_max_current,
|
||||
_charge_max_power,
|
||||
_discharge_max_power,
|
||||
_charge_amp_hour,
|
||||
_discharge_amp_hour,
|
||||
_production_energy,
|
||||
_consumption_energy,
|
||||
) = struct.unpack("!HHHHHHHHHH", data)
|
||||
|
||||
self.battery_voltage_min = _battery_voltage_min / 10
|
||||
self.battery_voltage_max = _battery_voltage_max / 10
|
||||
self.charge_max_current = _charge_max_current / 100
|
||||
self.discharge_max_current = _discharge_max_current / 100
|
||||
self.charge_max_power = _charge_max_power
|
||||
self.discharge_max_power = _discharge_max_power
|
||||
self.charge_amp_hour = _charge_amp_hour
|
||||
self.discharge_amp_hour = _discharge_amp_hour
|
||||
self.production_energy = _production_energy
|
||||
self.consumption_energy = _consumption_energy
|
||||
|
||||
def as_dict(self):
|
||||
return {
|
||||
DataName.BATTERY_VOLTAGE_MIN: self.battery_voltage_min,
|
||||
DataName.BATTERY_VOLTAGE_MAX: self.battery_voltage_max,
|
||||
DataName.CHARGE_MAX_CURRENT: self.charge_max_current,
|
||||
DataName.DISCHARGE_MAX_CURRENT: self.discharge_max_current,
|
||||
DataName.CHARGE_MAX_POWER: self.charge_max_power,
|
||||
DataName.DISCHARGE_MAX_POWER: self.discharge_max_power,
|
||||
DataName.CHARGE_AMP_HOUR: self.charge_amp_hour,
|
||||
DataName.DISCHARGE_AMP_HOUR: self.discharge_amp_hour,
|
||||
DataName.PRODUCTION_ENERGY: self.production_energy,
|
||||
DataName.CONSUMPTION_ENERGY: self.consumption_energy,
|
||||
}
|
||||
|
||||
|
||||
class HistoricalExtraInfo(DecodedData):
|
||||
run_days: int
|
||||
discharge_count: int
|
||||
full_charge_count: int
|
||||
total_charge_amp_hours: int
|
||||
total_discharge_amp_hours: int
|
||||
total_production_energy: int
|
||||
total_consumption_energy: int
|
||||
|
||||
def __init__(self, data: bytes | bytearray | memoryview) -> None:
|
||||
(
|
||||
_run_days,
|
||||
_discharge_count,
|
||||
_full_charge_count,
|
||||
_total_charge_amp_hours,
|
||||
_total_discharge_amp_hours,
|
||||
_total_production_energy,
|
||||
_total_consumption_energy,
|
||||
) = struct.unpack("!HHHLLLL", data)
|
||||
|
||||
self.run_days = _run_days
|
||||
self.discharge_count = _discharge_count
|
||||
self.full_charge_count = _full_charge_count
|
||||
self.total_charge_amp_hours = _total_charge_amp_hours
|
||||
self.total_discharge_amp_hours = _total_discharge_amp_hours
|
||||
self.total_production_energy = _total_production_energy
|
||||
self.total_consumption_energy = _total_consumption_energy
|
||||
|
||||
def as_dict(self):
|
||||
return {
|
||||
DataName.RUN_DAYS: self.run_days,
|
||||
DataName.DISCHARGE_COUNT: self.discharge_count,
|
||||
DataName.FULL_CHARGE_COUNT: self.full_charge_count,
|
||||
DataName.TOTAL_CHARGE_AMP_HOURS: self.total_charge_amp_hours,
|
||||
DataName.TOTAL_DISCHARGE_AMP_HOURS: self.total_discharge_amp_hours,
|
||||
DataName.TOTAL_PRODUCTION_ENERGY: self.total_production_energy,
|
||||
DataName.TOTAL_CONSUMPTION_ENERGY: self.total_consumption_energy,
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue