Compare commits

...

7 commits

7 changed files with 389 additions and 107 deletions

View file

@ -20,7 +20,7 @@ HISTORICAL_KEYS = {
DataName.BATTERY_VOLTAGE_MIN,
DataName.BATTERY_VOLTAGE_MAX,
DataName.CHARGE_MAX_CURRENT,
DataName._DISCHARGE_MAX_CURRENT,
DataName.DISCHARGE_MAX_CURRENT,
DataName.CHARGE_MAX_POWER,
DataName.DISCHARGE_MAX_POWER,
DataName.CHARGE_AMP_HOUR,
@ -58,7 +58,7 @@ KNOWN_KEYS = HISTORICAL_KEYS.union(INSTANT_KEYS)
MAP = {
"_internal_temperature?": "internal_temp",
"unknown1": "charge_max_current",
"unknown2": "_discharge_max_current?",
"unknown2": "discharge_max_current",
"internal_temperature": "internal_temp",
"battery_temperature": "battery_temp",
}

View file

@ -20,3 +20,7 @@ if __name__ == "__main__":
sleep(5)
cc.load_enabled = False
print(f"Load enabled: {cc.load_enabled}")
# print(f"Name: {cc.name}")
# cc.name = "☀️ 🔌🔋Charger"
# print(f"Name: {cc.name}")

View file

@ -2,15 +2,12 @@
# -*- coding: utf-8 -*-
import time
from decimal import Decimal
from typing import cast
from bluepy.btle import BTLEDisconnectError # type: ignore
from serial import SerialException # type: ignore
from .config import get_config, get_consumers, get_interface
from .protocol import parse_battery_state, parse_historical_entry, try_read_parse
from .solar_types import DataName
from .protocol import ChargeController
from .util import Periodical, log
@ -35,67 +32,56 @@ def main():
with get_interface() as dev:
log("Connected.")
cc = ChargeController(dev)
log(f"Controller model: {cc.model}")
log(f"Controller version: {cc.version}")
log(f"Controller serial: {cc.serial}")
for consumer in consumers:
consumer.controller = cc
# write(dev, construct_request(0, 32))
# Memory dump
# for address in range(0, 0x10000, 16):
# log(f"Reading 0x{address:04X}...")
# write(wd, construct_request(address, 16))
days = 7
res = try_read_parse(dev, 0x010B, 21, parse_historical_entry)
if res:
log(res)
for consumer in consumers:
consumer.write(res)
days = cast(int, res.get("run_days", 7))
extra = cc.extra
days = extra.run_days
res = cc.today.as_dict()
res.update(extra.as_dict())
for consumer in consumers:
consumer.write(res)
del extra
for i in range(days):
res = try_read_parse(
dev, 0xF000 + i, 10, parse_historical_entry
)
if res:
log({i: res})
for consumer in consumers:
consumer.write({str(i): res})
hist = cc.get_historical(i)
res = hist.as_dict()
log({i: res})
for consumer in consumers:
consumer.write({str(i): res})
while True:
now = time.time()
if per_voltages(now):
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
if data:
data[DataName.CALCULATED_BATTERY_POWER] = float(
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
* Decimal(
str(data.get(DataName.BATTERY_CURRENT, 0))
)
)
data[DataName.CALCULATED_PANEL_POWER] = float(
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
)
data[DataName.CALCULATED_LOAD_POWER] = float(
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
)
log(data)
for consumer in consumers:
consumer.write(data)
data = cc.state.as_dict()
log(data)
for consumer in consumers:
consumer.write(data)
if per_current_hist(now):
data = try_read_parse(
dev, 0x010B, 21, parse_historical_entry
)
if data:
log(data)
for consumer in consumers:
consumer.write(data)
data = cc.today.as_dict()
data.update(cc.extra.as_dict())
log(data)
for consumer in consumers:
consumer.write(data)
# print(".")
for consumer in consumers:
consumer.poll()
time.sleep(max(0, 1 - time.time() - now))
time.sleep(max(0, 1 - (time.time() - now)))
# if STATUS.get('load_enabled'):
# write(wd, CMD_DISABLE_LOAD)

View file

@ -2,9 +2,12 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
from ..protocol import ChargeController
class BaseConsumer(ABC):
settings: Dict[str, Any]
controller: ChargeController | None = None
@abstractmethod
def __init__(self, settings: Dict[str, Any]) -> None:

View file

@ -13,7 +13,7 @@ MAP_VALUES: Dict[DataName, Dict[str, Any]] = {
# DataName.BATTERY_VOLTAGE_MIN: {},
# DataName.BATTERY_VOLTAGE_MAX: {},
# DataName.CHARGE_MAX_CURRENT: {},
# DataName._DISCHARGE_MAX_CURRENT: {},
# DataName.DISCHARGE_MAX_CURRENT: {},
# DataName.CHARGE_MAX_POWER: {},
# DataName.DISCHARGE_MAX_POWER: {},
# DataName.CHARGE_AMP_HOUR: {},
@ -116,28 +116,37 @@ PayloadType: TypeAlias = str | bytes | bytearray | int | float | None
class MqttConsumer(BaseConsumer):
client: mqtt.Client
initialized: List[str]
_client: mqtt.Client | None = None
def __init__(self, settings: Dict[str, Any]) -> None:
self.initialized = []
super().__init__(settings)
self.client = mqtt.Client(client_id=settings["client"]["id"], userdata=self)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
self.client.on_connect_fail = self.on_connect_fail
@property
def client(self) -> mqtt.Client:
if self._client is not None:
return self._client
self._client = mqtt.Client(
client_id=self.settings["client"]["id"], userdata=self
)
self._client.on_connect = self.on_connect
self._client.on_message = self.on_message
self._client.on_disconnect = self.on_disconnect
self._client.on_connect_fail = self.on_connect_fail
# Will must be set before connecting!!
self.client.will_set(
self._client.will_set(
f"{self.topic_prefix}/available", payload="offline", retain=True
)
while True:
try:
self.client.connect(
settings["client"]["host"],
settings["client"]["port"],
settings["client"]["keepalive"],
self._client.connect(
self.settings["client"]["host"],
self.settings["client"]["port"],
self.settings["client"]["keepalive"],
)
break
except OSError as err:
@ -151,6 +160,7 @@ class MqttConsumer(BaseConsumer):
raise
print(err)
sleep(0.1)
return self._client
def config(self, settings: Dict[str, Any]):
super().config(settings)
@ -167,9 +177,19 @@ class MqttConsumer(BaseConsumer):
settings.setdefault("discovery_prefix", "homeassistant")
_controller_id: str | None = None
@property
def controller_id(self) -> str:
assert self.controller is not None
# Controller serial is fetched from device, cache it.
if self._controller_id is None:
self._controller_id = self.controller.serial
return f"{self.controller.manufacturer_id}_{self._controller_id}"
@property
def topic_prefix(self):
return f"{self.settings['prefix']}/{self.settings['device_id']}"
return f"{self.settings['prefix']}/{self.controller_id}"
def get_ha_config(
self,
@ -181,21 +201,25 @@ class MqttConsumer(BaseConsumer):
state_class: Optional[str] = None,
):
assert state_class in [None, "measurement", "total", "total_increasing"]
assert self.controller is not None
res = {
"~": f"{self.topic_prefix}",
"unique_id": f"{self.settings['device_id']}_{id}",
"unique_id": f"{self.controller_id}_{id}",
"object_id": f"{self.controller_id}_{id}",
"availability_topic": "~/available",
"state_topic": f"~/{id}",
"name": name,
"device": {
"identifiers": [
self.settings["device_id"],
self.controller_id,
],
# TODO: Get charger serial and use for identifier instead
# See: https://www.home-assistant.io/integrations/sensor.mqtt/#device
# "via_device": self.settings["device_id"],
"manufacturer": self.controller.manufacturer,
"model": self.controller.model,
"hw_version": self.controller.version,
"via_device": self.settings["device_id"],
"suggested_area": "Solar panel",
"name": self.controller.name,
},
"force_update": True,
"expire_after": expiry,
@ -253,22 +277,25 @@ class MqttConsumer(BaseConsumer):
def write(self, data: Dict[str, PayloadType]):
self.client.publish(f"{self.topic_prefix}/raw", payload=json.dumps(data))
for k, v in data.items():
if k in MAP_VALUES:
if k not in self.initialized:
km = MAP_VALUES[DataName(k)]
pretty_name = k.replace("_", " ").capitalize()
for dataname, data_value in data.items():
if dataname in MAP_VALUES:
if dataname not in self.initialized:
km = MAP_VALUES[DataName(dataname)]
pretty_name = dataname.replace("_", " ").capitalize()
disc_prefix = self.settings["discovery_prefix"]
device_id = self.settings["device_id"]
self.client.publish(
f"{disc_prefix}/sensor/{device_id}_{k}/config",
payload=json.dumps(self.get_ha_config(k, pretty_name, **km)),
f"{disc_prefix}/sensor/{self.controller_id}/{dataname}/config",
payload=json.dumps(
self.get_ha_config(dataname, pretty_name, **km)
),
retain=True,
)
self.initialized.append(k)
self.initialized.append(dataname)
self.client.publish(f"{self.topic_prefix}/{k}", v, retain=True)
self.client.publish(
f"{self.topic_prefix}/{dataname}", data_value, retain=True
)
def exit(self):
self.client.publish(

View file

@ -8,7 +8,14 @@ from libscrc import modbus # type: ignore
from .constants import ACTION_READ, ACTION_WRITE, POSSIBLE_MARKER
from .interfaces import BaseInterface
from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, ChargerState, DataItem
from .solar_types import (
DATA_BATTERY_STATE,
HISTORICAL_DATA,
ChargerState,
DataItem,
HistoricalData,
HistoricalExtraInfo,
)
from .util import log
@ -146,8 +153,8 @@ def readMemory(fh: BaseInterface, address: int, words: int = 1) -> Optional[byte
def writeMemory(fh: BaseInterface, address: int, data: bytes):
if len(data) % 2:
raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes")
if len(data) != 2:
raise ValueError(f"Data must consist of a two-byte word, got {len(data)} bytes")
header = construct_write_request(address)
write(fh, header + data)
@ -159,7 +166,11 @@ def writeMemory(fh: BaseInterface, address: int, data: bytes):
header = fh.read(3)
if header and len(header) == 3:
operation, size, address = header
rdata = fh.read(size * 2)
log(header)
# size field is zero when writing device name for whatever reason
# write command seems to only accept a single word, so this is fine;
# we just hardcode the number of bytes read to two here.
rdata = fh.read(2)
_crc = fh.read(2)
if rdata and _crc:
try:
@ -176,6 +187,19 @@ def writeMemory(fh: BaseInterface, address: int, data: bytes):
return None
def writeMemoryMultiple(fh: BaseInterface, address: int, data: bytes):
if len(data) % 2:
raise ValueError(f"Data must consist of two-byte words, got {len(data)} bytes")
res = bytearray()
for i in range(len(data) // 2):
d = data[i * 2 : (i + 1) * 2]
log(address + i, d)
r = writeMemory(fh, address + i, d)
if r:
res.extend(r)
return res
def try_read_parse(
dev: BaseInterface,
address: int,
@ -205,11 +229,19 @@ def try_read_parse(
class ChargeController:
device: BaseInterface
manufacturer: str = "SRNE Solar Co., Ltd."
manufacturer_id: str = "srne"
def __init__(self, device: BaseInterface):
self.device = device
_cached_serial: str | None = None
@property
def serial(self) -> str:
if self._cached_serial is not None:
return self._cached_serial
data = readMemory(self.device, 0x18, 3)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
@ -217,18 +249,31 @@ class ChargeController:
p1 = data[0]
p2 = data[1]
p3 = (data[2] << 8) + data[3]
return f"{p1}-{p2}-{p3}"
self._cached_serial = f"{p1}-{p2}-{p3}"
return self._cached_serial
_cached_model: str | None = None
@property
def model(self) -> str:
if self._cached_model is not None:
return self._cached_model
data = readMemory(self.device, 0x0C, 8)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return data.decode("utf-8").strip()
self._cached_model = data.decode("utf-8").strip()
return self._cached_model
_cached_version: str | None = None
@property
def version(self) -> str:
if self._cached_version is not None:
return self._cached_version
data = readMemory(self.device, 0x14, 4)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
@ -237,7 +282,42 @@ class ChargeController:
minor = data[2]
patch = data[3]
return f"{major}.{minor}.{patch}"
self._cached_version = f"{major}.{minor}.{patch}"
return self._cached_version
_cached_name: str | None = None
@property
def name(self) -> str:
if self._cached_name is not None:
return self._cached_name
data = readMemory(self.device, 0x0049, 16)
if data is None:
raise IOError
res = data.decode("UTF-16BE").strip()
return res
@name.setter
def name(self, value: str):
bin_value = bytearray(value.encode("UTF-16BE"))
if len(bin_value) > 32:
raise ValueError(
f"value must be no more than 32 bytes once encoded as UTF-16BE. {len(bin_value)} bytes supplied"
)
# Pad name to 32 bytes to ensure ensure nothing is left of old name
while len(bin_value) < 32:
bin_value.extend(b"\x00\x20")
print(len(bin_value), bin_value)
data = writeMemoryMultiple(self.device, 0x0049, bin_value)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
res = data.decode("UTF-16BE").strip()
if res != value:
log(f"setting device name failed; {res!r} != {value!r}")
self._cached_name = value
@property
def load_enabled(self) -> bool:
@ -259,23 +339,31 @@ class ChargeController:
@property
def state(self) -> ChargerState:
raise NotImplementedError
"""
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
if data:
data[DataName.CALCULATED_BATTERY_POWER] = float(
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.BATTERY_CURRENT, 0)))
)
data[DataName.CALCULATED_PANEL_POWER] = float(
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
)
data[DataName.CALCULATED_LOAD_POWER] = float(
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
)
log(data)
for consumer in consumers:
consumer.write(data)
"""
data = readMemory(self.device, 0x0100, 11)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return ChargerState(data)
def get_historical(self, day) -> HistoricalData:
data = readMemory(self.device, 0xF000 + day, 10)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return HistoricalData(data)
@property
def today(self) -> HistoricalData:
data = readMemory(self.device, 0x010B, 10)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return HistoricalData(data)
@property
def extra(self) -> HistoricalExtraInfo:
data = readMemory(self.device, 0x0115, 11)
if data is None:
raise IOError # FIXME: Raise specific error in readMemory
return HistoricalExtraInfo(data)

View file

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
import struct
from abc import ABC, abstractmethod
from enum import Enum, unique
from typing import Callable, Optional
from typing import Any, Callable, Dict, Optional
@unique
@ -21,7 +22,7 @@ class DataName(str, Enum):
BATTERY_VOLTAGE_MIN = "battery_voltage_min"
BATTERY_VOLTAGE_MAX = "battery_voltage_max"
CHARGE_MAX_CURRENT = "charge_max_current"
_DISCHARGE_MAX_CURRENT = "_discharge_max_current?"
DISCHARGE_MAX_CURRENT = "discharge_max_current"
CHARGE_MAX_POWER = "charge_max_power"
DISCHARGE_MAX_POWER = "discharge_max_power"
CHARGE_AMP_HOUR = "charge_amp_hour"
@ -104,13 +105,14 @@ HISTORICAL_DATA = [
DataItem(DataName.BATTERY_VOLTAGE_MIN, "H", "V", lambda n: n / 10),
DataItem(DataName.BATTERY_VOLTAGE_MAX, "H", "V", lambda n: n / 10),
DataItem(DataName.CHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName._DISCHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.DISCHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.CHARGE_MAX_POWER, "H", "W"),
DataItem(DataName.DISCHARGE_MAX_POWER, "H", "W"),
DataItem(DataName.CHARGE_AMP_HOUR, "H", "Ah"),
DataItem(DataName.DISCHARGE_AMP_HOUR, "H", "Ah"),
DataItem(DataName.PRODUCTION_ENERGY, "H", "Wh"),
DataItem(DataName.CONSUMPTION_ENERGY, "H", "Wh"),
#
DataItem(DataName.RUN_DAYS, "H"),
DataItem(DataName.DISCHARGE_COUNT, "H"),
DataItem(DataName.FULL_CHARGE_COUNT, "H"),
@ -121,6 +123,178 @@ HISTORICAL_DATA = [
]
class ChargerState:
class DecodedData(ABC):
@abstractmethod
def __init__(self, data: bytes | bytearray | memoryview) -> None:
raise NotImplementedError
...
@abstractmethod
def as_dict(self) -> Dict[DataName, Any]:
...
class ChargerState(DecodedData):
battery_charge: int
battery_voltage: float
battery_current: float
internal_temperature: int
battery_temperature: int
load_voltage: float
load_current: float
load_power: float
panel_voltage: float
panel_current: float
panel_power: float
load_enabled: bool
def __init__(self, data: bytes | bytearray | memoryview) -> None:
(
_battery_charge,
_battery_voltage,
_battery_current,
_internal_temperature,
_battery_temperature,
_load_voltage,
_load_current,
_load_power,
_panel_voltage,
_panel_current,
_panel_power,
_load_enabled,
) = struct.unpack("!HHHBBHHHHHHx?", data)
self.battery_charge = _battery_charge
self.battery_voltage = _battery_voltage / 10
self.battery_current = _battery_current / 100
self.internal_temperature = parse_temperature(_internal_temperature)
self.battery_temperature = parse_temperature(_battery_temperature)
self.load_voltage = _load_voltage / 10
self.load_current = _load_current / 100
self.load_power = _load_power
self.panel_voltage = _panel_voltage / 10
self.panel_current = _panel_current / 100
self.panel_power = _panel_power
self.load_enabled = bool(_load_enabled)
@property
def calculated_battery_power(self) -> float:
return self.battery_voltage * self.battery_current
@property
def calculated_panel_power(self) -> float:
return self.panel_voltage * self.panel_current
@property
def calculated_load_power(self) -> float:
return self.load_voltage * self.load_current
def as_dict(self):
return {
DataName.BATTERY_CHARGE: self.battery_charge,
DataName.BATTERY_VOLTAGE: self.battery_voltage,
DataName.BATTERY_CURRENT: self.battery_current,
DataName.INTERNAL_TEMPERATURE: self.internal_temperature,
DataName.BATTERY_TEMPERATURE: self.battery_temperature,
DataName.LOAD_VOLTAGE: self.load_voltage,
DataName.LOAD_CURRENT: self.load_current,
DataName.LOAD_POWER: self.load_power,
DataName.PANEL_VOLTAGE: self.panel_voltage,
DataName.PANEL_CURRENT: self.panel_current,
DataName.PANEL_POWER: self.panel_power,
DataName.LOAD_ENABLED: self.load_enabled,
DataName.CALCULATED_BATTERY_POWER: self.calculated_battery_power,
DataName.CALCULATED_PANEL_POWER: self.calculated_panel_power,
DataName.CALCULATED_LOAD_POWER: self.calculated_load_power,
}
class HistoricalData(DecodedData):
battery_voltage_min: float
battery_voltage_max: float
charge_max_current: float
discharge_max_current: float
charge_max_power: int
discharge_max_power: int
charge_amp_hour: int
discharge_amp_hour: int
production_energy: int
consumption_energy: int
def __init__(self, data: bytes | bytearray | memoryview) -> None:
(
_battery_voltage_min,
_battery_voltage_max,
_charge_max_current,
_discharge_max_current,
_charge_max_power,
_discharge_max_power,
_charge_amp_hour,
_discharge_amp_hour,
_production_energy,
_consumption_energy,
) = struct.unpack("!HHHHHHHHHH", data)
self.battery_voltage_min = _battery_voltage_min / 10
self.battery_voltage_max = _battery_voltage_max / 10
self.charge_max_current = _charge_max_current / 100
self.discharge_max_current = _discharge_max_current / 100
self.charge_max_power = _charge_max_power
self.discharge_max_power = _discharge_max_power
self.charge_amp_hour = _charge_amp_hour
self.discharge_amp_hour = _discharge_amp_hour
self.production_energy = _production_energy
self.consumption_energy = _consumption_energy
def as_dict(self):
return {
DataName.BATTERY_VOLTAGE_MIN: self.battery_voltage_min,
DataName.BATTERY_VOLTAGE_MAX: self.battery_voltage_max,
DataName.CHARGE_MAX_CURRENT: self.charge_max_current,
DataName.DISCHARGE_MAX_CURRENT: self.discharge_max_current,
DataName.CHARGE_MAX_POWER: self.charge_max_power,
DataName.DISCHARGE_MAX_POWER: self.discharge_max_power,
DataName.CHARGE_AMP_HOUR: self.charge_amp_hour,
DataName.DISCHARGE_AMP_HOUR: self.discharge_amp_hour,
DataName.PRODUCTION_ENERGY: self.production_energy,
DataName.CONSUMPTION_ENERGY: self.consumption_energy,
}
class HistoricalExtraInfo(DecodedData):
run_days: int
discharge_count: int
full_charge_count: int
total_charge_amp_hours: int
total_discharge_amp_hours: int
total_production_energy: int
total_consumption_energy: int
def __init__(self, data: bytes | bytearray | memoryview) -> None:
(
_run_days,
_discharge_count,
_full_charge_count,
_total_charge_amp_hours,
_total_discharge_amp_hours,
_total_production_energy,
_total_consumption_energy,
) = struct.unpack("!HHHLLLL", data)
self.run_days = _run_days
self.discharge_count = _discharge_count
self.full_charge_count = _full_charge_count
self.total_charge_amp_hours = _total_charge_amp_hours
self.total_discharge_amp_hours = _total_discharge_amp_hours
self.total_production_energy = _total_production_energy
self.total_consumption_energy = _total_consumption_energy
def as_dict(self):
return {
DataName.RUN_DAYS: self.run_days,
DataName.DISCHARGE_COUNT: self.discharge_count,
DataName.FULL_CHARGE_COUNT: self.full_charge_count,
DataName.TOTAL_CHARGE_AMP_HOURS: self.total_charge_amp_hours,
DataName.TOTAL_DISCHARGE_AMP_HOURS: self.total_discharge_amp_hours,
DataName.TOTAL_PRODUCTION_ENERGY: self.total_production_energy,
DataName.TOTAL_CONSUMPTION_ENERGY: self.total_consumption_energy,
}