Compare commits
7 commits
4bb77c3bb3
...
67a25eeef9
Author | SHA1 | Date | |
---|---|---|---|
67a25eeef9 | |||
6c0f1c3d13 | |||
4dc42ee6f5 | |||
3aa6b13615 | |||
71919fc406 | |||
fe9c6a82ff | |||
f0c2057428 |
7 changed files with 389 additions and 107 deletions
|
@ -20,7 +20,7 @@ HISTORICAL_KEYS = {
|
||||||
DataName.BATTERY_VOLTAGE_MIN,
|
DataName.BATTERY_VOLTAGE_MIN,
|
||||||
DataName.BATTERY_VOLTAGE_MAX,
|
DataName.BATTERY_VOLTAGE_MAX,
|
||||||
DataName.CHARGE_MAX_CURRENT,
|
DataName.CHARGE_MAX_CURRENT,
|
||||||
DataName._DISCHARGE_MAX_CURRENT,
|
DataName.DISCHARGE_MAX_CURRENT,
|
||||||
DataName.CHARGE_MAX_POWER,
|
DataName.CHARGE_MAX_POWER,
|
||||||
DataName.DISCHARGE_MAX_POWER,
|
DataName.DISCHARGE_MAX_POWER,
|
||||||
DataName.CHARGE_AMP_HOUR,
|
DataName.CHARGE_AMP_HOUR,
|
||||||
|
@ -58,7 +58,7 @@ KNOWN_KEYS = HISTORICAL_KEYS.union(INSTANT_KEYS)
|
||||||
MAP = {
|
MAP = {
|
||||||
"_internal_temperature?": "internal_temp",
|
"_internal_temperature?": "internal_temp",
|
||||||
"unknown1": "charge_max_current",
|
"unknown1": "charge_max_current",
|
||||||
"unknown2": "_discharge_max_current?",
|
"unknown2": "discharge_max_current",
|
||||||
"internal_temperature": "internal_temp",
|
"internal_temperature": "internal_temp",
|
||||||
"battery_temperature": "battery_temp",
|
"battery_temperature": "battery_temp",
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,3 +20,7 @@ if __name__ == "__main__":
|
||||||
sleep(5)
|
sleep(5)
|
||||||
cc.load_enabled = False
|
cc.load_enabled = False
|
||||||
print(f"Load enabled: {cc.load_enabled}")
|
print(f"Load enabled: {cc.load_enabled}")
|
||||||
|
|
||||||
|
# print(f"Name: {cc.name}")
|
||||||
|
# cc.name = "☀️ 🔌🔋Charger"
|
||||||
|
# print(f"Name: {cc.name}")
|
||||||
|
|
|
@ -2,15 +2,12 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import time
|
import time
|
||||||
from decimal import Decimal
|
|
||||||
from typing import cast
|
|
||||||
|
|
||||||
from bluepy.btle import BTLEDisconnectError # type: ignore
|
from bluepy.btle import BTLEDisconnectError # type: ignore
|
||||||
from serial import SerialException # type: ignore
|
from serial import SerialException # type: ignore
|
||||||
|
|
||||||
from .config import get_config, get_consumers, get_interface
|
from .config import get_config, get_consumers, get_interface
|
||||||
from .protocol import parse_battery_state, parse_historical_entry, try_read_parse
|
from .protocol import ChargeController
|
||||||
from .solar_types import DataName
|
|
||||||
from .util import Periodical, log
|
from .util import Periodical, log
|
||||||
|
|
||||||
|
|
||||||
|
@ -35,67 +32,56 @@ def main():
|
||||||
with get_interface() as dev:
|
with get_interface() as dev:
|
||||||
log("Connected.")
|
log("Connected.")
|
||||||
|
|
||||||
|
cc = ChargeController(dev)
|
||||||
|
log(f"Controller model: {cc.model}")
|
||||||
|
log(f"Controller version: {cc.version}")
|
||||||
|
log(f"Controller serial: {cc.serial}")
|
||||||
|
for consumer in consumers:
|
||||||
|
consumer.controller = cc
|
||||||
|
|
||||||
# write(dev, construct_request(0, 32))
|
# write(dev, construct_request(0, 32))
|
||||||
|
|
||||||
# Memory dump
|
# Memory dump
|
||||||
# for address in range(0, 0x10000, 16):
|
# for address in range(0, 0x10000, 16):
|
||||||
# log(f"Reading 0x{address:04X}...")
|
# log(f"Reading 0x{address:04X}...")
|
||||||
# write(wd, construct_request(address, 16))
|
# write(wd, construct_request(address, 16))
|
||||||
days = 7
|
extra = cc.extra
|
||||||
res = try_read_parse(dev, 0x010B, 21, parse_historical_entry)
|
days = extra.run_days
|
||||||
if res:
|
|
||||||
log(res)
|
res = cc.today.as_dict()
|
||||||
for consumer in consumers:
|
res.update(extra.as_dict())
|
||||||
consumer.write(res)
|
for consumer in consumers:
|
||||||
days = cast(int, res.get("run_days", 7))
|
consumer.write(res)
|
||||||
|
del extra
|
||||||
|
|
||||||
for i in range(days):
|
for i in range(days):
|
||||||
res = try_read_parse(
|
hist = cc.get_historical(i)
|
||||||
dev, 0xF000 + i, 10, parse_historical_entry
|
res = hist.as_dict()
|
||||||
)
|
log({i: res})
|
||||||
if res:
|
for consumer in consumers:
|
||||||
log({i: res})
|
consumer.write({str(i): res})
|
||||||
for consumer in consumers:
|
|
||||||
consumer.write({str(i): res})
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
|
|
||||||
if per_voltages(now):
|
if per_voltages(now):
|
||||||
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
|
data = cc.state.as_dict()
|
||||||
if data:
|
log(data)
|
||||||
data[DataName.CALCULATED_BATTERY_POWER] = float(
|
for consumer in consumers:
|
||||||
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
|
consumer.write(data)
|
||||||
* Decimal(
|
|
||||||
str(data.get(DataName.BATTERY_CURRENT, 0))
|
|
||||||
)
|
|
||||||
)
|
|
||||||
data[DataName.CALCULATED_PANEL_POWER] = float(
|
|
||||||
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
|
|
||||||
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
|
|
||||||
)
|
|
||||||
data[DataName.CALCULATED_LOAD_POWER] = float(
|
|
||||||
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
|
|
||||||
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
|
|
||||||
)
|
|
||||||
log(data)
|
|
||||||
for consumer in consumers:
|
|
||||||
consumer.write(data)
|
|
||||||
|
|
||||||
if per_current_hist(now):
|
if per_current_hist(now):
|
||||||
data = try_read_parse(
|
data = cc.today.as_dict()
|
||||||
dev, 0x010B, 21, parse_historical_entry
|
data.update(cc.extra.as_dict())
|
||||||
)
|
log(data)
|
||||||
if data:
|
for consumer in consumers:
|
||||||
log(data)
|
consumer.write(data)
|
||||||
for consumer in consumers:
|
|
||||||
consumer.write(data)
|
|
||||||
|
|
||||||
# print(".")
|
# print(".")
|
||||||
for consumer in consumers:
|
for consumer in consumers:
|
||||||
consumer.poll()
|
consumer.poll()
|
||||||
|
|
||||||
time.sleep(max(0, 1 - time.time() - now))
|
time.sleep(max(0, 1 - (time.time() - now)))
|
||||||
|
|
||||||
# if STATUS.get('load_enabled'):
|
# if STATUS.get('load_enabled'):
|
||||||
# write(wd, CMD_DISABLE_LOAD)
|
# write(wd, CMD_DISABLE_LOAD)
|
||||||
|
|
|
@ -2,9 +2,12 @@
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
|
from ..protocol import ChargeController
|
||||||
|
|
||||||
|
|
||||||
class BaseConsumer(ABC):
|
class BaseConsumer(ABC):
|
||||||
settings: Dict[str, Any]
|
settings: Dict[str, Any]
|
||||||
|
controller: ChargeController | None = None
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def __init__(self, settings: Dict[str, Any]) -> None:
|
def __init__(self, settings: Dict[str, Any]) -> None:
|
||||||
|
|
|
@ -13,7 +13,7 @@ MAP_VALUES: Dict[DataName, Dict[str, Any]] = {
|
||||||
# DataName.BATTERY_VOLTAGE_MIN: {},
|
# DataName.BATTERY_VOLTAGE_MIN: {},
|
||||||
# DataName.BATTERY_VOLTAGE_MAX: {},
|
# DataName.BATTERY_VOLTAGE_MAX: {},
|
||||||
# DataName.CHARGE_MAX_CURRENT: {},
|
# DataName.CHARGE_MAX_CURRENT: {},
|
||||||
# DataName._DISCHARGE_MAX_CURRENT: {},
|
# DataName.DISCHARGE_MAX_CURRENT: {},
|
||||||
# DataName.CHARGE_MAX_POWER: {},
|
# DataName.CHARGE_MAX_POWER: {},
|
||||||
# DataName.DISCHARGE_MAX_POWER: {},
|
# DataName.DISCHARGE_MAX_POWER: {},
|
||||||
# DataName.CHARGE_AMP_HOUR: {},
|
# DataName.CHARGE_AMP_HOUR: {},
|
||||||
|
@ -116,28 +116,37 @@ PayloadType: TypeAlias = str | bytes | bytearray | int | float | None
|
||||||
|
|
||||||
|
|
||||||
class MqttConsumer(BaseConsumer):
|
class MqttConsumer(BaseConsumer):
|
||||||
client: mqtt.Client
|
|
||||||
initialized: List[str]
|
initialized: List[str]
|
||||||
|
|
||||||
|
_client: mqtt.Client | None = None
|
||||||
|
|
||||||
def __init__(self, settings: Dict[str, Any]) -> None:
|
def __init__(self, settings: Dict[str, Any]) -> None:
|
||||||
self.initialized = []
|
self.initialized = []
|
||||||
|
|
||||||
super().__init__(settings)
|
super().__init__(settings)
|
||||||
self.client = mqtt.Client(client_id=settings["client"]["id"], userdata=self)
|
|
||||||
self.client.on_connect = self.on_connect
|
@property
|
||||||
self.client.on_message = self.on_message
|
def client(self) -> mqtt.Client:
|
||||||
self.client.on_disconnect = self.on_disconnect
|
if self._client is not None:
|
||||||
self.client.on_connect_fail = self.on_connect_fail
|
return self._client
|
||||||
|
|
||||||
|
self._client = mqtt.Client(
|
||||||
|
client_id=self.settings["client"]["id"], userdata=self
|
||||||
|
)
|
||||||
|
self._client.on_connect = self.on_connect
|
||||||
|
self._client.on_message = self.on_message
|
||||||
|
self._client.on_disconnect = self.on_disconnect
|
||||||
|
self._client.on_connect_fail = self.on_connect_fail
|
||||||
# Will must be set before connecting!!
|
# Will must be set before connecting!!
|
||||||
self.client.will_set(
|
self._client.will_set(
|
||||||
f"{self.topic_prefix}/available", payload="offline", retain=True
|
f"{self.topic_prefix}/available", payload="offline", retain=True
|
||||||
)
|
)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
self.client.connect(
|
self._client.connect(
|
||||||
settings["client"]["host"],
|
self.settings["client"]["host"],
|
||||||
settings["client"]["port"],
|
self.settings["client"]["port"],
|
||||||
settings["client"]["keepalive"],
|
self.settings["client"]["keepalive"],
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
except OSError as err:
|
except OSError as err:
|
||||||
|
@ -151,6 +160,7 @@ class MqttConsumer(BaseConsumer):
|
||||||
raise
|
raise
|
||||||
print(err)
|
print(err)
|
||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
|
return self._client
|
||||||
|
|
||||||
def config(self, settings: Dict[str, Any]):
|
def config(self, settings: Dict[str, Any]):
|
||||||
super().config(settings)
|
super().config(settings)
|
||||||
|
@ -167,9 +177,19 @@ class MqttConsumer(BaseConsumer):
|
||||||
|
|
||||||
settings.setdefault("discovery_prefix", "homeassistant")
|
settings.setdefault("discovery_prefix", "homeassistant")
|
||||||
|
|
||||||
|
_controller_id: str | None = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def controller_id(self) -> str:
|
||||||
|
assert self.controller is not None
|
||||||
|
# Controller serial is fetched from device, cache it.
|
||||||
|
if self._controller_id is None:
|
||||||
|
self._controller_id = self.controller.serial
|
||||||
|
return f"{self.controller.manufacturer_id}_{self._controller_id}"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def topic_prefix(self):
|
def topic_prefix(self):
|
||||||
return f"{self.settings['prefix']}/{self.settings['device_id']}"
|
return f"{self.settings['prefix']}/{self.controller_id}"
|
||||||
|
|
||||||
def get_ha_config(
|
def get_ha_config(
|
||||||
self,
|
self,
|
||||||
|
@ -181,21 +201,25 @@ class MqttConsumer(BaseConsumer):
|
||||||
state_class: Optional[str] = None,
|
state_class: Optional[str] = None,
|
||||||
):
|
):
|
||||||
assert state_class in [None, "measurement", "total", "total_increasing"]
|
assert state_class in [None, "measurement", "total", "total_increasing"]
|
||||||
|
assert self.controller is not None
|
||||||
|
|
||||||
res = {
|
res = {
|
||||||
"~": f"{self.topic_prefix}",
|
"~": f"{self.topic_prefix}",
|
||||||
"unique_id": f"{self.settings['device_id']}_{id}",
|
"unique_id": f"{self.controller_id}_{id}",
|
||||||
|
"object_id": f"{self.controller_id}_{id}",
|
||||||
"availability_topic": "~/available",
|
"availability_topic": "~/available",
|
||||||
"state_topic": f"~/{id}",
|
"state_topic": f"~/{id}",
|
||||||
"name": name,
|
"name": name,
|
||||||
"device": {
|
"device": {
|
||||||
"identifiers": [
|
"identifiers": [
|
||||||
self.settings["device_id"],
|
self.controller_id,
|
||||||
],
|
],
|
||||||
# TODO: Get charger serial and use for identifier instead
|
"manufacturer": self.controller.manufacturer,
|
||||||
# See: https://www.home-assistant.io/integrations/sensor.mqtt/#device
|
"model": self.controller.model,
|
||||||
# "via_device": self.settings["device_id"],
|
"hw_version": self.controller.version,
|
||||||
|
"via_device": self.settings["device_id"],
|
||||||
"suggested_area": "Solar panel",
|
"suggested_area": "Solar panel",
|
||||||
|
"name": self.controller.name,
|
||||||
},
|
},
|
||||||
"force_update": True,
|
"force_update": True,
|
||||||
"expire_after": expiry,
|
"expire_after": expiry,
|
||||||
|
@ -253,22 +277,25 @@ class MqttConsumer(BaseConsumer):
|
||||||
def write(self, data: Dict[str, PayloadType]):
|
def write(self, data: Dict[str, PayloadType]):
|
||||||
self.client.publish(f"{self.topic_prefix}/raw", payload=json.dumps(data))
|
self.client.publish(f"{self.topic_prefix}/raw", payload=json.dumps(data))
|
||||||
|
|
||||||
for k, v in data.items():
|
for dataname, data_value in data.items():
|
||||||
if k in MAP_VALUES:
|
if dataname in MAP_VALUES:
|
||||||
if k not in self.initialized:
|
if dataname not in self.initialized:
|
||||||
km = MAP_VALUES[DataName(k)]
|
km = MAP_VALUES[DataName(dataname)]
|
||||||
pretty_name = k.replace("_", " ").capitalize()
|
pretty_name = dataname.replace("_", " ").capitalize()
|
||||||
disc_prefix = self.settings["discovery_prefix"]
|
disc_prefix = self.settings["discovery_prefix"]
|
||||||
device_id = self.settings["device_id"]
|
|
||||||
|
|
||||||
self.client.publish(
|
self.client.publish(
|
||||||
f"{disc_prefix}/sensor/{device_id}_{k}/config",
|
f"{disc_prefix}/sensor/{self.controller_id}/{dataname}/config",
|
||||||
payload=json.dumps(self.get_ha_config(k, pretty_name, **km)),
|
payload=json.dumps(
|
||||||
|
self.get_ha_config(dataname, pretty_name, **km)
|
||||||
|
),
|
||||||
retain=True,
|
retain=True,
|
||||||
)
|
)
|
||||||
self.initialized.append(k)
|
self.initialized.append(dataname)
|
||||||
|
|
||||||
self.client.publish(f"{self.topic_prefix}/{k}", v, retain=True)
|
self.client.publish(
|
||||||
|
f"{self.topic_prefix}/{dataname}", data_value, retain=True
|
||||||
|
)
|
||||||
|
|
||||||
def exit(self):
|
def exit(self):
|
||||||
self.client.publish(
|
self.client.publish(
|
||||||
|
|
|
@ -8,7 +8,14 @@ from libscrc import modbus # type: ignore
|
||||||
|
|
||||||
from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER
|
from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER
|
||||||
from .interfaces import BaseInterface
|
from .interfaces import BaseInterface
|
||||||
from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, ChargerState, DataItem
|
from .solar_types import (
|
||||||
|
DATA_BATTERY_STATE,
|
||||||
|
HISTORICAL_DATA,
|
||||||
|
ChargerState,
|
||||||
|
DataItem,
|
||||||
|
HistoricalData,
|
||||||
|
HistoricalExtraInfo,
|
||||||
|
)
|
||||||
from .util import log
|
from .util import log
|
||||||
|
|
||||||
|
|
||||||
|
@ -146,8 +153,8 @@ def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[byte
|
||||||
|
|
||||||
|
|
||||||
def writeMemory(fh: BaseInterface, address: int, data: bytes):
|
def writeMemory(fh: BaseInterface, address: int, data: bytes):
|
||||||
if len(data) % 2:
|
if len(data) != 2:
|
||||||
raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes")
|
raise ValueError(f"Data must consist of a two-byte word, got {len(data)} bytes")
|
||||||
|
|
||||||
header = construct_write_request(address)
|
header = construct_write_request(address)
|
||||||
write(fh, header + data)
|
write(fh, header + data)
|
||||||
|
@ -159,7 +166,11 @@ def writeMemory(fh: BaseInterface, address: int, data: bytes):
|
||||||
header = fh.read(3)
|
header = fh.read(3)
|
||||||
if header and len(header) == 3:
|
if header and len(header) == 3:
|
||||||
operation, size, address = header
|
operation, size, address = header
|
||||||
rdata = fh.read(size * 2)
|
log(header)
|
||||||
|
# size field is zero when writing device name for whatever reason
|
||||||
|
# write command seems to only accept a single word, so this is fine;
|
||||||
|
# we just hardcode the number of bytes read to two here.
|
||||||
|
rdata = fh.read(2)
|
||||||
_crc = fh.read(2)
|
_crc = fh.read(2)
|
||||||
if rdata and _crc:
|
if rdata and _crc:
|
||||||
try:
|
try:
|
||||||
|
@ -176,6 +187,19 @@ def writeMemory(fh: BaseInterface, address: int, data: bytes):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def writeMemoryMultiple(fh: BaseInterface, address: int, data: bytes):
|
||||||
|
if len(data) % 2:
|
||||||
|
raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes")
|
||||||
|
res = bytearray()
|
||||||
|
for i in range(len(data) // 2):
|
||||||
|
d = data[i * 2 : (i + 1) * 2]
|
||||||
|
log(address + i, d)
|
||||||
|
r = writeMemory(fh, address + i, d)
|
||||||
|
if r:
|
||||||
|
res.extend(r)
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
def try_read_parse(
|
def try_read_parse(
|
||||||
dev: BaseInterface,
|
dev: BaseInterface,
|
||||||
address: int,
|
address: int,
|
||||||
|
@ -205,11 +229,19 @@ def try_read_parse(
|
||||||
class ChargeController:
|
class ChargeController:
|
||||||
device: BaseInterface
|
device: BaseInterface
|
||||||
|
|
||||||
|
manufacturer: str = "SRNE Solar Co., Ltd."
|
||||||
|
manufacturer_id: str = "srne"
|
||||||
|
|
||||||
def __init__(self, device: BaseInterface):
|
def __init__(self, device: BaseInterface):
|
||||||
self.device = device
|
self.device = device
|
||||||
|
|
||||||
|
_cached_serial: str | None = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def serial(self) -> str:
|
def serial(self) -> str:
|
||||||
|
if self._cached_serial is not None:
|
||||||
|
return self._cached_serial
|
||||||
|
|
||||||
data = readMemory(self.device, 0x18, 3)
|
data = readMemory(self.device, 0x18, 3)
|
||||||
if data is None:
|
if data is None:
|
||||||
raise IOError # FIXME: Raise specific error in readMemory
|
raise IOError # FIXME: Raise specific error in readMemory
|
||||||
|
@ -217,18 +249,31 @@ class ChargeController:
|
||||||
p1 = data[0]
|
p1 = data[0]
|
||||||
p2 = data[1]
|
p2 = data[1]
|
||||||
p3 = (data[2] << 8) + data[3]
|
p3 = (data[2] << 8) + data[3]
|
||||||
return f"{p1}-{p2}-{p3}"
|
|
||||||
|
self._cached_serial = f"{p1}-{p2}-{p3}"
|
||||||
|
return self._cached_serial
|
||||||
|
|
||||||
|
_cached_model: str | None = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def model(self) -> str:
|
def model(self) -> str:
|
||||||
|
if self._cached_model is not None:
|
||||||
|
return self._cached_model
|
||||||
|
|
||||||
data = readMemory(self.device, 0x0C, 8)
|
data = readMemory(self.device, 0x0C, 8)
|
||||||
if data is None:
|
if data is None:
|
||||||
raise IOError # FIXME: Raise specific error in readMemory
|
raise IOError # FIXME: Raise specific error in readMemory
|
||||||
|
|
||||||
return data.decode("utf-8").strip()
|
self._cached_model = data.decode("utf-8").strip()
|
||||||
|
return self._cached_model
|
||||||
|
|
||||||
|
_cached_version: str | None = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def version(self) -> str:
|
def version(self) -> str:
|
||||||
|
if self._cached_version is not None:
|
||||||
|
return self._cached_version
|
||||||
|
|
||||||
data = readMemory(self.device, 0x14, 4)
|
data = readMemory(self.device, 0x14, 4)
|
||||||
if data is None:
|
if data is None:
|
||||||
raise IOError # FIXME: Raise specific error in readMemory
|
raise IOError # FIXME: Raise specific error in readMemory
|
||||||
|
@ -237,7 +282,42 @@ class ChargeController:
|
||||||
minor = data[2]
|
minor = data[2]
|
||||||
patch = data[3]
|
patch = data[3]
|
||||||
|
|
||||||
return f"{major}.{minor}.{patch}"
|
self._cached_version = f"{major}.{minor}.{patch}"
|
||||||
|
return self._cached_version
|
||||||
|
|
||||||
|
_cached_name: str | None = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str:
|
||||||
|
if self._cached_name is not None:
|
||||||
|
return self._cached_name
|
||||||
|
data = readMemory(self.device, 0x0049, 16)
|
||||||
|
if data is None:
|
||||||
|
raise IOError
|
||||||
|
res = data.decode("UTF-16BE").strip()
|
||||||
|
return res
|
||||||
|
|
||||||
|
@name.setter
|
||||||
|
def name(self, value: str):
|
||||||
|
bin_value = bytearray(value.encode("UTF-16BE"))
|
||||||
|
if len(bin_value) > 32:
|
||||||
|
raise ValueError(
|
||||||
|
f"value must be no more than 32 bytes once encoded as UTF-16BE. {len(bin_value)} bytes supplied"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Pad name to 32 bytes to ensure ensure nothing is left of old name
|
||||||
|
while len(bin_value) < 32:
|
||||||
|
bin_value.extend(b"\x00\x20")
|
||||||
|
print(len(bin_value), bin_value)
|
||||||
|
|
||||||
|
data = writeMemoryMultiple(self.device, 0x0049, bin_value)
|
||||||
|
if data is None:
|
||||||
|
raise IOError # FIXME: Raise specific error in readMemory
|
||||||
|
|
||||||
|
res = data.decode("UTF-16BE").strip()
|
||||||
|
if res != value:
|
||||||
|
log(f"setting device name failed; {res!r} != {value!r}")
|
||||||
|
self._cached_name = value
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def load_enabled(self) -> bool:
|
def load_enabled(self) -> bool:
|
||||||
|
@ -259,23 +339,31 @@ class ChargeController:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self) -> ChargerState:
|
def state(self) -> ChargerState:
|
||||||
raise NotImplementedError
|
data = readMemory(self.device, 0x0100, 11)
|
||||||
"""
|
if data is None:
|
||||||
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
|
raise IOError # FIXME: Raise specific error in readMemory
|
||||||
if data:
|
|
||||||
data[DataName.CALCULATED_BATTERY_POWER] = float(
|
return ChargerState(data)
|
||||||
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
|
|
||||||
* Decimal(str(data.get(DataName.BATTERY_CURRENT, 0)))
|
def get_historical(self, day) -> HistoricalData:
|
||||||
)
|
data = readMemory(self.device, 0xF000 + day, 10)
|
||||||
data[DataName.CALCULATED_PANEL_POWER] = float(
|
if data is None:
|
||||||
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
|
raise IOError # FIXME: Raise specific error in readMemory
|
||||||
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
|
|
||||||
)
|
return HistoricalData(data)
|
||||||
data[DataName.CALCULATED_LOAD_POWER] = float(
|
|
||||||
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
|
@property
|
||||||
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
|
def today(self) -> HistoricalData:
|
||||||
)
|
data = readMemory(self.device, 0x010B, 10)
|
||||||
log(data)
|
if data is None:
|
||||||
for consumer in consumers:
|
raise IOError # FIXME: Raise specific error in readMemory
|
||||||
consumer.write(data)
|
|
||||||
"""
|
return HistoricalData(data)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def extra(self) -> HistoricalExtraInfo:
|
||||||
|
data = readMemory(self.device, 0x0115, 11)
|
||||||
|
if data is None:
|
||||||
|
raise IOError # FIXME: Raise specific error in readMemory
|
||||||
|
|
||||||
|
return HistoricalExtraInfo(data)
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import struct
|
import struct
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
from enum import Enum, unique
|
from enum import Enum, unique
|
||||||
from typing import Callable, Optional
|
from typing import Any, Callable, Dict, Optional
|
||||||
|
|
||||||
|
|
||||||
@unique
|
@unique
|
||||||
|
@ -21,7 +22,7 @@ class DataName(str, Enum):
|
||||||
BATTERY_VOLTAGE_MIN = "battery_voltage_min"
|
BATTERY_VOLTAGE_MIN = "battery_voltage_min"
|
||||||
BATTERY_VOLTAGE_MAX = "battery_voltage_max"
|
BATTERY_VOLTAGE_MAX = "battery_voltage_max"
|
||||||
CHARGE_MAX_CURRENT = "charge_max_current"
|
CHARGE_MAX_CURRENT = "charge_max_current"
|
||||||
_DISCHARGE_MAX_CURRENT = "_discharge_max_current?"
|
DISCHARGE_MAX_CURRENT = "discharge_max_current"
|
||||||
CHARGE_MAX_POWER = "charge_max_power"
|
CHARGE_MAX_POWER = "charge_max_power"
|
||||||
DISCHARGE_MAX_POWER = "discharge_max_power"
|
DISCHARGE_MAX_POWER = "discharge_max_power"
|
||||||
CHARGE_AMP_HOUR = "charge_amp_hour"
|
CHARGE_AMP_HOUR = "charge_amp_hour"
|
||||||
|
@ -104,13 +105,14 @@ HISTORICAL_DATA = [
|
||||||
DataItem(DataName.BATTERY_VOLTAGE_MIN, "H", "V", lambda n: n / 10),
|
DataItem(DataName.BATTERY_VOLTAGE_MIN, "H", "V", lambda n: n / 10),
|
||||||
DataItem(DataName.BATTERY_VOLTAGE_MAX, "H", "V", lambda n: n / 10),
|
DataItem(DataName.BATTERY_VOLTAGE_MAX, "H", "V", lambda n: n / 10),
|
||||||
DataItem(DataName.CHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
|
DataItem(DataName.CHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
|
||||||
DataItem(DataName._DISCHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
|
DataItem(DataName.DISCHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
|
||||||
DataItem(DataName.CHARGE_MAX_POWER, "H", "W"),
|
DataItem(DataName.CHARGE_MAX_POWER, "H", "W"),
|
||||||
DataItem(DataName.DISCHARGE_MAX_POWER, "H", "W"),
|
DataItem(DataName.DISCHARGE_MAX_POWER, "H", "W"),
|
||||||
DataItem(DataName.CHARGE_AMP_HOUR, "H", "Ah"),
|
DataItem(DataName.CHARGE_AMP_HOUR, "H", "Ah"),
|
||||||
DataItem(DataName.DISCHARGE_AMP_HOUR, "H", "Ah"),
|
DataItem(DataName.DISCHARGE_AMP_HOUR, "H", "Ah"),
|
||||||
DataItem(DataName.PRODUCTION_ENERGY, "H", "Wh"),
|
DataItem(DataName.PRODUCTION_ENERGY, "H", "Wh"),
|
||||||
DataItem(DataName.CONSUMPTION_ENERGY, "H", "Wh"),
|
DataItem(DataName.CONSUMPTION_ENERGY, "H", "Wh"),
|
||||||
|
#
|
||||||
DataItem(DataName.RUN_DAYS, "H"),
|
DataItem(DataName.RUN_DAYS, "H"),
|
||||||
DataItem(DataName.DISCHARGE_COUNT, "H"),
|
DataItem(DataName.DISCHARGE_COUNT, "H"),
|
||||||
DataItem(DataName.FULL_CHARGE_COUNT, "H"),
|
DataItem(DataName.FULL_CHARGE_COUNT, "H"),
|
||||||
|
@ -121,6 +123,178 @@ HISTORICAL_DATA = [
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class ChargerState:
|
class DecodedData(ABC):
|
||||||
|
@abstractmethod
|
||||||
def __init__(self, data: bytes | bytearray | memoryview) -> None:
|
def __init__(self, data: bytes | bytearray | memoryview) -> None:
|
||||||
raise NotImplementedError
|
...
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def as_dict(self) -> Dict[DataName, Any]:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
class ChargerState(DecodedData):
|
||||||
|
battery_charge: int
|
||||||
|
battery_voltage: float
|
||||||
|
battery_current: float
|
||||||
|
internal_temperature: int
|
||||||
|
battery_temperature: int
|
||||||
|
load_voltage: float
|
||||||
|
load_current: float
|
||||||
|
load_power: float
|
||||||
|
panel_voltage: float
|
||||||
|
panel_current: float
|
||||||
|
panel_power: float
|
||||||
|
load_enabled: bool
|
||||||
|
|
||||||
|
def __init__(self, data: bytes | bytearray | memoryview) -> None:
|
||||||
|
(
|
||||||
|
_battery_charge,
|
||||||
|
_battery_voltage,
|
||||||
|
_battery_current,
|
||||||
|
_internal_temperature,
|
||||||
|
_battery_temperature,
|
||||||
|
_load_voltage,
|
||||||
|
_load_current,
|
||||||
|
_load_power,
|
||||||
|
_panel_voltage,
|
||||||
|
_panel_current,
|
||||||
|
_panel_power,
|
||||||
|
_load_enabled,
|
||||||
|
) = struct.unpack("!HHHBBHHHHHHx?", data)
|
||||||
|
|
||||||
|
self.battery_charge = _battery_charge
|
||||||
|
self.battery_voltage = _battery_voltage / 10
|
||||||
|
self.battery_current = _battery_current / 100
|
||||||
|
self.internal_temperature = parse_temperature(_internal_temperature)
|
||||||
|
self.battery_temperature = parse_temperature(_battery_temperature)
|
||||||
|
self.load_voltage = _load_voltage / 10
|
||||||
|
self.load_current = _load_current / 100
|
||||||
|
self.load_power = _load_power
|
||||||
|
self.panel_voltage = _panel_voltage / 10
|
||||||
|
self.panel_current = _panel_current / 100
|
||||||
|
self.panel_power = _panel_power
|
||||||
|
self.load_enabled = bool(_load_enabled)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def calculated_battery_power(self) -> float:
|
||||||
|
return self.battery_voltage * self.battery_current
|
||||||
|
|
||||||
|
@property
|
||||||
|
def calculated_panel_power(self) -> float:
|
||||||
|
return self.panel_voltage * self.panel_current
|
||||||
|
|
||||||
|
@property
|
||||||
|
def calculated_load_power(self) -> float:
|
||||||
|
return self.load_voltage * self.load_current
|
||||||
|
|
||||||
|
def as_dict(self):
|
||||||
|
return {
|
||||||
|
DataName.BATTERY_CHARGE: self.battery_charge,
|
||||||
|
DataName.BATTERY_VOLTAGE: self.battery_voltage,
|
||||||
|
DataName.BATTERY_CURRENT: self.battery_current,
|
||||||
|
DataName.INTERNAL_TEMPERATURE: self.internal_temperature,
|
||||||
|
DataName.BATTERY_TEMPERATURE: self.battery_temperature,
|
||||||
|
DataName.LOAD_VOLTAGE: self.load_voltage,
|
||||||
|
DataName.LOAD_CURRENT: self.load_current,
|
||||||
|
DataName.LOAD_POWER: self.load_power,
|
||||||
|
DataName.PANEL_VOLTAGE: self.panel_voltage,
|
||||||
|
DataName.PANEL_CURRENT: self.panel_current,
|
||||||
|
DataName.PANEL_POWER: self.panel_power,
|
||||||
|
DataName.LOAD_ENABLED: self.load_enabled,
|
||||||
|
DataName.CALCULATED_BATTERY_POWER: self.calculated_battery_power,
|
||||||
|
DataName.CALCULATED_PANEL_POWER: self.calculated_panel_power,
|
||||||
|
DataName.CALCULATED_LOAD_POWER: self.calculated_load_power,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class HistoricalData(DecodedData):
|
||||||
|
battery_voltage_min: float
|
||||||
|
battery_voltage_max: float
|
||||||
|
charge_max_current: float
|
||||||
|
discharge_max_current: float
|
||||||
|
charge_max_power: int
|
||||||
|
discharge_max_power: int
|
||||||
|
charge_amp_hour: int
|
||||||
|
discharge_amp_hour: int
|
||||||
|
production_energy: int
|
||||||
|
consumption_energy: int
|
||||||
|
|
||||||
|
def __init__(self, data: bytes | bytearray | memoryview) -> None:
|
||||||
|
(
|
||||||
|
_battery_voltage_min,
|
||||||
|
_battery_voltage_max,
|
||||||
|
_charge_max_current,
|
||||||
|
_discharge_max_current,
|
||||||
|
_charge_max_power,
|
||||||
|
_discharge_max_power,
|
||||||
|
_charge_amp_hour,
|
||||||
|
_discharge_amp_hour,
|
||||||
|
_production_energy,
|
||||||
|
_consumption_energy,
|
||||||
|
) = struct.unpack("!HHHHHHHHHH", data)
|
||||||
|
|
||||||
|
self.battery_voltage_min = _battery_voltage_min / 10
|
||||||
|
self.battery_voltage_max = _battery_voltage_max / 10
|
||||||
|
self.charge_max_current = _charge_max_current / 100
|
||||||
|
self.discharge_max_current = _discharge_max_current / 100
|
||||||
|
self.charge_max_power = _charge_max_power
|
||||||
|
self.discharge_max_power = _discharge_max_power
|
||||||
|
self.charge_amp_hour = _charge_amp_hour
|
||||||
|
self.discharge_amp_hour = _discharge_amp_hour
|
||||||
|
self.production_energy = _production_energy
|
||||||
|
self.consumption_energy = _consumption_energy
|
||||||
|
|
||||||
|
def as_dict(self):
|
||||||
|
return {
|
||||||
|
DataName.BATTERY_VOLTAGE_MIN: self.battery_voltage_min,
|
||||||
|
DataName.BATTERY_VOLTAGE_MAX: self.battery_voltage_max,
|
||||||
|
DataName.CHARGE_MAX_CURRENT: self.charge_max_current,
|
||||||
|
DataName.DISCHARGE_MAX_CURRENT: self.discharge_max_current,
|
||||||
|
DataName.CHARGE_MAX_POWER: self.charge_max_power,
|
||||||
|
DataName.DISCHARGE_MAX_POWER: self.discharge_max_power,
|
||||||
|
DataName.CHARGE_AMP_HOUR: self.charge_amp_hour,
|
||||||
|
DataName.DISCHARGE_AMP_HOUR: self.discharge_amp_hour,
|
||||||
|
DataName.PRODUCTION_ENERGY: self.production_energy,
|
||||||
|
DataName.CONSUMPTION_ENERGY: self.consumption_energy,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class HistoricalExtraInfo(DecodedData):
|
||||||
|
run_days: int
|
||||||
|
discharge_count: int
|
||||||
|
full_charge_count: int
|
||||||
|
total_charge_amp_hours: int
|
||||||
|
total_discharge_amp_hours: int
|
||||||
|
total_production_energy: int
|
||||||
|
total_consumption_energy: int
|
||||||
|
|
||||||
|
def __init__(self, data: bytes | bytearray | memoryview) -> None:
|
||||||
|
(
|
||||||
|
_run_days,
|
||||||
|
_discharge_count,
|
||||||
|
_full_charge_count,
|
||||||
|
_total_charge_amp_hours,
|
||||||
|
_total_discharge_amp_hours,
|
||||||
|
_total_production_energy,
|
||||||
|
_total_consumption_energy,
|
||||||
|
) = struct.unpack("!HHHLLLL", data)
|
||||||
|
|
||||||
|
self.run_days = _run_days
|
||||||
|
self.discharge_count = _discharge_count
|
||||||
|
self.full_charge_count = _full_charge_count
|
||||||
|
self.total_charge_amp_hours = _total_charge_amp_hours
|
||||||
|
self.total_discharge_amp_hours = _total_discharge_amp_hours
|
||||||
|
self.total_production_energy = _total_production_energy
|
||||||
|
self.total_consumption_energy = _total_consumption_energy
|
||||||
|
|
||||||
|
def as_dict(self):
|
||||||
|
return {
|
||||||
|
DataName.RUN_DAYS: self.run_days,
|
||||||
|
DataName.DISCHARGE_COUNT: self.discharge_count,
|
||||||
|
DataName.FULL_CHARGE_COUNT: self.full_charge_count,
|
||||||
|
DataName.TOTAL_CHARGE_AMP_HOURS: self.total_charge_amp_hours,
|
||||||
|
DataName.TOTAL_DISCHARGE_AMP_HOURS: self.total_discharge_amp_hours,
|
||||||
|
DataName.TOTAL_PRODUCTION_ENERGY: self.total_production_energy,
|
||||||
|
DataName.TOTAL_CONSUMPTION_ENERGY: self.total_consumption_energy,
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue