Start organizing file structure

This commit is contained in:
Odd Stråbø 2023-01-04 18:05:09 +01:00
parent 0b3a38276a
commit 441e820ac4
12 changed files with 0 additions and 0 deletions

0
srnemqtt/__init__.py Normal file
View file

View file

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
from typing import Any, Dict
class BaseConsumer(ABC):
settings: Dict[str, Any]
@abstractmethod
def __init__(self, settings: Dict[str, Any]) -> None:
self.config(settings)
@abstractmethod
def write(self, data: Dict[str, Any]):
"""
Process and send data to wherever it is going.
Avoid blocking.
"""
pass
@abstractmethod
def poll(self):
"""
This function will be ran whenever there is down time.
If your consumer needs to do something periodically, do so here.
This function should not block.
"""
pass
def exit(self):
"""
Called on exit, clean up your handles here
"""
pass
def config(self, settings: Dict[str, Any]):
self.settings = settings
def __enter__(self):
return self
def __exit__(self, etype, value, traceback):
self.exit()

299
srnemqtt/consumers/mqtt.py Normal file
View file

@ -0,0 +1,299 @@
# -*- coding: utf-8 -*-
import json
from time import sleep
from typing import Any, Dict, List, Optional
from uuid import uuid4
import paho.mqtt.client as mqtt
from solar_types import DataName
from . import BaseConsumer
MAP_VALUES: Dict[DataName, Dict[str, Any]] = {
# DataName.BATTERY_VOLTAGE_MIN: {},
# DataName.BATTERY_VOLTAGE_MAX: {},
# DataName.CHARGE_MAX_CURRENT: {},
# DataName._DISCHARGE_MAX_CURRENT: {},
# DataName.CHARGE_MAX_POWER: {},
# DataName.DISCHARGE_MAX_POWER: {},
# DataName.CHARGE_AMP_HOUR: {},
# DataName.DISCHARGE_AMP_HOUR: {},
DataName.PRODUCTION_ENERGY: {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
},
DataName.CONSUMPTION_ENERGY: {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
},
# DataName.RUN_DAYS: {},
# DataName.DISCHARGE_COUNT: {},
# DataName.FULL_CHARGE_COUNT: {},
# DataName.TOTAL_CHARGE_AMP_HOURS: {},
# DataName.TOTAL_DISCHARGE_AMP_HOURS: {},
DataName.TOTAL_PRODUCTION_ENERGY: {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
"expiry": 180,
},
DataName.TOTAL_CONSUMPTION_ENERGY: {
"unit": "Wh",
"type": "energy",
"state_class": "total_increasing",
"expiry": 180,
},
#
DataName.BATTERY_CHARGE: {
"unit": "%",
"type": "battery",
"state_class": "measurement",
},
DataName.BATTERY_VOLTAGE: {
"unit": "V",
"type": "voltage",
"state_class": "measurement",
},
DataName.BATTERY_CURRENT: {
"unit": "A",
"type": "current",
"state_class": "measurement",
},
DataName.INTERNAL_TEMPERATURE: {
"unit": "°C",
"type": "temperature",
"state_class": "measurement",
},
DataName.BATTERY_TEMPERATURE: {
"unit": "°C",
"type": "temperature",
"state_class": "measurement",
},
DataName.LOAD_VOLTAGE: {
"unit": "V",
"type": "voltage",
"state_class": "measurement",
},
DataName.LOAD_CURRENT: {
"unit": "A",
"type": "current",
"state_class": "measurement",
},
DataName.LOAD_POWER: {"unit": "W", "type": "power", "state_class": "measurement"},
DataName.PANEL_VOLTAGE: {
"unit": "V",
"type": "voltage",
"state_class": "measurement",
},
DataName.PANEL_CURRENT: {
"unit": "A",
"type": "current",
"state_class": "measurement",
},
DataName.PANEL_POWER: {"unit": "W", "type": "power", "state_class": "measurement"},
# DataName.LOAD_ENABLED: {},
DataName.CALCULATED_BATTERY_POWER: {
"unit": "W",
"type": "power",
"state_class": "measurement",
},
DataName.CALCULATED_PANEL_POWER: {
"unit": "W",
"type": "power",
"state_class": "measurement",
},
DataName.CALCULATED_LOAD_POWER: {
"unit": "W",
"type": "power",
"state_class": "measurement",
},
}
class MqttConsumer(BaseConsumer):
client: mqtt.Client
initialized: List[str]
def __init__(self, settings: Dict[str, Any]) -> None:
self.initialized = []
super().__init__(settings)
self.client = mqtt.Client(client_id=settings["client"]["id"], userdata=self)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
self.client.on_connect_fail = self.on_connect_fail
# Will must be set before connecting!!
self.client.will_set(
f"{self.topic_prefix}/available", payload="offline", retain=True
)
while True:
try:
self.client.connect(
settings["client"]["host"],
settings["client"]["port"],
settings["client"]["keepalive"],
)
break
except OSError as err:
# Network is unreachable
if err.errno == 101:
pass
# Temporary failure in name resolution
elif err.errno == -3:
pass
else:
raise
print(err)
sleep(0.1)
def config(self, settings: Dict[str, Any]):
super().config(settings)
settings.setdefault("client", {})
settings["client"].setdefault("id", None)
settings["client"].setdefault("host", "")
settings["client"].setdefault("port", 1883)
settings["client"].setdefault("keepalive", 60)
if not settings.get("device_id"):
settings["device_id"] = str(uuid4())
settings.setdefault("prefix", "solarmppt")
settings.setdefault("discovery_prefix", "homeassistant")
@property
def topic_prefix(self):
return f"{self.settings['prefix']}/{self.settings['device_id']}"
def get_ha_config(
self,
id,
name,
unit: Optional[str] = None,
type: Optional[str] = None,
expiry: int = 90,
state_class: Optional[str] = None,
):
assert state_class in [None, "measurement", "total", "total_increasing"]
res = {
"~": f"{self.topic_prefix}",
"unique_id": f"{self.settings['device_id']}_{id}",
"availability_topic": "~/available",
"state_topic": f"~/{id}",
"name": name,
"device": {
"identifiers": [
self.settings["device_id"],
],
# TODO: Get charger serial and use for identifier instead
# See: https://www.home-assistant.io/integrations/sensor.mqtt/#device
# "via_device": self.settings["device_id"],
"suggested_area": "Solar panel",
},
"force_update": True,
"expire_after": expiry,
}
if unit:
res["unit_of_meas"] = unit
if type:
res["dev_cla"] = type
if state_class:
res["state_class"] = state_class
return res
# The callback for when the client receives a CONNACK response from the server.
@staticmethod
def on_connect(client: mqtt.Client, userdata: "MqttConsumer", flags, rc):
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
# client.subscribe("$SYS/#")
client.publish(
f"{userdata.topic_prefix}/available", payload="online", retain=True
)
@staticmethod
def on_connect_fail(client: mqtt.Client, userdata: "MqttConsumer"):
print(userdata.__class__.__name__, "on_connect_fail")
# The callback for when a PUBLISH message is received from the server.
@staticmethod
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
@staticmethod
def on_disconnect(client: mqtt.Client, userdata: "MqttConsumer", rc, prop=None):
print(userdata.__class__.__name__, "on_disconnect", rc)
def poll(self):
res = self.client.loop(timeout=0.1, max_packets=5)
if res != mqtt.MQTT_ERR_SUCCESS:
print(self.__class__.__name__, "loop returned non-success:", res)
try:
sleep(1)
res = self.client.reconnect()
if res != mqtt.MQTT_ERR_SUCCESS:
print(self.__class__.__name__, "Reconnect failed:", res)
except (OSError, mqtt.WebsocketConnectionError) as err:
print(self.__class__.__name__, "Reconnect failed:", err)
return super().poll()
def write(self, data: Dict[str, Any]):
self.client.publish(f"{self.topic_prefix}/raw", payload=json.dumps(data))
for k, v in data.items():
if k in MAP_VALUES:
if k not in self.initialized:
km = MAP_VALUES[DataName(k)]
pretty_name = k.replace("_", " ").capitalize()
disc_prefix = self.settings["discovery_prefix"]
device_id = self.settings["device_id"]
self.client.publish(
f"{disc_prefix}/sensor/{device_id}_{k}/config",
payload=json.dumps(self.get_ha_config(k, pretty_name, **km)),
retain=True,
)
self.initialized.append(k)
self.client.publish(f"{self.topic_prefix}/{k}", v, retain=True)
def exit(self):
self.client.publish(
f"{self.topic_prefix}/available", payload="offline", retain=True
)
while self.client.want_write():
self.client.loop_write(10)
self.client.disconnect()
return super().exit()
# Client(client_id="", clean_session=True, userdata=None,
# protocol=MQTTv311, transport="tcp")
# connect_srv(domain, keepalive=60, bind_address="")
# Connect to a broker using an SRV DNS lookup to obtain the broker address.
# Takes the following arguments:
# domain
# the DNS domain to search for SRV records.
# If None, try to determine the local domain name.
# client.will_set(topic, payload=None, qos=0, retain=False)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
# client.loop_forever()

187
srnemqtt/feasycom_ble.py Normal file
View file

@ -0,0 +1,187 @@
# -*- coding: utf-8 -*-
import io
import queue
import time
from typing import TYPE_CHECKING, Optional, cast
from bluepy import btle
if TYPE_CHECKING:
from _typeshed import ReadableBuffer, WriteableBuffer
WRITE_DEVICE = "0000ffd1-0000-1000-8000-00805f9b34fb"
READ_DEVICE = "0000fff1-0000-1000-8000-00805f9b34fb"
class BTLEUart(io.RawIOBase):
mac: str
write_endpoint: str
read_endpoint: str
timeout: float
device: Optional[btle.Peripheral] = None
_write_handle: Optional[btle.Characteristic] = None
_read_handle: Optional[btle.Characteristic] = None
delegate: "_QueueDelegate"
_read_buffer: bytearray
class _QueueDelegate(btle.DefaultDelegate):
queue: queue.Queue
handle: Optional[int]
def __init__(self, queue, handle=None):
self.queue = queue
self.handle = handle
def handleNotification(self, cHandle: int, data: bytes):
# print("Notification:", cHandle, "sent data", binascii.b2a_hex(data))
if self.handle is not None and cHandle != self.handle:
return
self.queue.put(data)
def __init__(
self,
mac: str,
write_endpoint: str = WRITE_DEVICE,
read_endpoint: str = READ_DEVICE,
timeout: float = 30,
):
self.mac = mac
self.write_endpoint = write_endpoint
self.read_endpoint = read_endpoint
self.timeout = timeout
self.delegate = self._QueueDelegate(queue.Queue())
self._read_buffer = bytearray()
self._connect()
def _ensure_connected(self):
if self.device is None:
self._connect()
def _poll(self, timeout: float = 0.0001):
self._ensure_connected()
if TYPE_CHECKING:
self.device = cast(btle.Peripheral, self.device)
start = time.time()
left = timeout - (time.time() - start)
while self.device.waitForNotifications(max(left, 0) or 0.0001):
left = timeout - (time.time() - start)
if left < 0:
break
def _connect(self):
try:
del self.device
except Exception:
pass
self.device = btle.Peripheral(self.mac).withDelegate(self.delegate)
self._read_handle = self.device.getCharacteristics(uuid=self.read_endpoint)[0]
# self.delegate.handle = self._read_handle.handle
self._write_handle = self.device.getCharacteristics(uuid=self.write_endpoint)[0]
# print("Handles:", self._read_handle.handle, self._write_handle.handle)
def _read(self, num: Optional[int] = None, timeout: Optional[float] = None):
self._ensure_connected()
if TYPE_CHECKING:
self.device = cast(btle.Peripheral, self.device)
if timeout is None:
timeout = self.timeout
if num is None:
start = time.time()
while not len(self._read_buffer):
left = timeout - (time.time() - start)
if left < 0:
break
self._poll()
try:
self._read_buffer.extend(self.delegate.queue.get_nowait())
except queue.Empty:
pass
try:
while True:
self._poll()
self._read_buffer.extend(self.delegate.queue.get_nowait())
except queue.Empty:
pass
else:
start = time.time()
while len(self._read_buffer) < num:
left = timeout - (time.time() - start)
if left < 0:
break
self._poll()
try:
self._read_buffer.extend(self.delegate.queue.get_nowait())
except queue.Empty:
pass
if num is None:
data = bytes(self._read_buffer.copy())
self._read_buffer.clear()
else:
data = bytes(self._read_buffer[:num])
del self._read_buffer[:num]
return data or None
def readinto(self, buffer: "WriteableBuffer") -> Optional[int]:
data = self._read(len(buffer))
if data is None:
return None
buffer[: len(data)] = data
return len(data)
def readall(self) -> bytes:
return self._read()
def read(
self, size: Optional[int] = None, timeout: Optional[float] = None
) -> Optional[bytes]:
if timeout:
_timeout = self.timeout
self.timeout = timeout
if size is None:
res = super().read()
else:
res = super().read(size)
if timeout:
self.timeout = _timeout
return res
def write(self, b: "ReadableBuffer") -> Optional[int]:
self._ensure_connected()
if TYPE_CHECKING:
self.device = cast(btle.Peripheral, self.device)
if self._write_handle is None:
raise IOError("write_handle not open")
self._write_handle.write(b, withResponse=True)
return len(b)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.device.disconnect()
del self.device
def seekable(self) -> bool:
return False
def readable(self) -> bool:
return True
def writable(self) -> bool:
return True

458
srnemqtt/solar_ble.py Executable file
View file

@ -0,0 +1,458 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import struct
import sys
import time
from decimal import Decimal
from io import RawIOBase
from typing import Callable, Collection, Optional, cast
from bluepy import btle
from libscrc import modbus
from feasycom_ble import BTLEUart
from solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, DataItem, DataName
from test_config import get_config, get_consumers
MAC = "DC:0D:30:9C:61:BA"
# write_service = "0000ffd0-0000-1000-8000-00805f9b34fb"
# read_service = "0000fff0-0000-1000-8000-00805f9b34fb"
ACTION_READ = 0x03
ACTION_WRITE = 0x03
POSSIBLE_MARKER = (0x01, 0xFD, 0xFE, 0xFF)
# get(255, 12, 2)
# "ff 03 00 0c 00 02"
CMD_GET_1 = b"\xff\x03\x00\x0c\x00\x02"
# > ff 03 04 20 20 20 20
# get(255, 12, 8)
# ff 03 00 0c 00 08
CMD_GET_MODEL = b"\xff\x03\x00\x0c\x00\x08"
# > ff 03 10 20 20 20 20 4d 4c 32 34 32 30 20 20 20 20 20 20
# Device SKU: ML2420
# get(255, 20, 4)
# ff 03 00 14 00 04
CMD_GET_VERSION = b"\xff\x03\x00\x14\x00\x04"
# > ff 03 08 00 04 02 00 02 00 00 03
# CC ?? 11 22 33 ?? 44 55 66
# Version: 4.2.0
# get(255, 24, 3)
# ff 03 00 18 00 03
CMD_GET_SERIAL = b"\xff\x03\x00\x18\x00\x03"
# > ff 03 06 3c 13 02 67 00 01
# CC 11 22 33 33 ?? ??
# SN: 60-19-0615
# get(255, 256, 7)
# ff 03 01 00 00 07
CMD_GET_BATTERY_STATE = b"\xff\x03\x01\x00\x00\x07"
# > ff 03 0e 00 48 00 7e 00 1d 0e 0d 00 7e 00 1c 00 03
# CC 11 11 22 22 33 33 44 55 66 66 77 77 88 88
# 1: Battery charge: 72 %
# 2: Battery voltage: 12.6 V
# 3: Battery current: 0.29 A
# 4: Internal temperature?
# 5: External temperature probe for battery signet 8bit: 13 degC
# 6: Load voltage: 12.6 V
# 7: Load current: 0.28 A
# 8: Load power: 3 W
# get(255, 263, 4)
# ff 03 01 07 00 04
CMD_GET_PANEL_STATUS = b"\xff\x03\x01\x07\x00\x04"
# > ff 03 08 00 c8 00 14 00 04 00 01
# CC 11 11 22 22 33 33 ?? ??
# 1: Panel voltage: 20.0 V
# 2: Panel current: 0.20 A
# 3: Panel power: 4 W
# Charging status?
# set(255, 266, 1 or 0)
# ff 06 01 0a 00 01
CMD_ENABLE_LOAD = b"\xff\x06\x01\x0a\x00\x01"
CMD_DISABLE_LOAD = b"\xff\x06\x01\x0a\x00\x00"
REG_LOAD_ENABLE = 0x010A
# get(255, 267, 21)
# ff 03 01 0b 00 15
CMD_GET_LOAD_PARAMETERS = b"\xff\x03\x01\x0b\x00\x15"
# > ff 03 2a 00 7c 00 7f 00 51 00 20 00 0a 00 03 00 00 00 00 00
# > 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
# > 00 00 00 00 00
# get(255, 288, 3)
# ff 03 01 20 00 03
CMD_GET_2 = b"\xff\x03\x01\x20\x00\x03"
# > ff 03 06 80 02 00 00 00 00
# CC 11 22 33 33 33 33
# 1: boolean flag?: 1
# 2: ?: 2
# 3: ?: 0
# get(255, 57345, 33)
# ff 03 e0 01 00 21
CMD_GET_BATTERY_PARAMETERS = b"\xff\x03\xe0\x01\x00\x21"
# > ff 03 42 07 d0 00 c8 ff 0c 00 02 00 a0 00 9b 00 92 00 90 00
# > 8a 00 84 00 7e 00 78 00 6f 00 6a 64 32 00 05 00 78 00 78 00
# > 1e 00 03 00 41 00 a3 00 4b 00 a3 00 00 00 00 00 00 00 00 00
# > 0f 00 05 00 05 00 04 01 00
# 33 * uint16
# get(1, 61440, 10)
# 01 03 f0 00 00 0a
CMD_GET_HISTORICAL_TODAY = b"\x01\x03\xf0\x00\x00\x0a"
CMD_GET_HISTORICAL_YESTERDAY = b"\x01\x03\xf0\x01\x00\x0a"
CMD_GET_HISTORICAL_D2 = b"\x01\x03\xf0\x02\x00\x0a"
CMD_GET_HISTORICAL_D3 = b"\x01\x03\xf0\x03\x00\x0a"
# ,- battery_min_voltage
# | ,- battery_max_voltage
# | | ,- ?1 max charge %?
# | | | ,- ?2
# | | | | ,- charge_max_power
# | | | | | ,- discharge_max_power
# | | | | | | ,- charge_amp_hour
# | | | | | | | ,- discharge_amp_hour
# | | | | | | | | ,- production_power
# | | | | | | | | | ,- consumption_power
# _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___ _|___
# > 01 03 14 00 7c 00 7f 00 51 00 20 00 0a 00 03 00 00 00 00 00 00 00 00
# > 01 03 14 00 7c 00 7f 00 53 00 20 00 0a 00 03 00 00 00 00 00 00 00 00
# battery_min_voltage = 12.4 V
# battery_max_voltage = 12.7 V
# ?1 = 83 % ?
# ?2 =
# charge_max_power = 10 W
# discharge_max_power = 3 W
# charge_amp_hour = 0 Ah
# discharge_amp_hour = 0 Ah
# production_power = 0 Wh
# consumption_power = 0 Wh
# ff 78 00 00 00 01
CMD_ = b"\xff\x78\x00\x00\x00\x01"
# CMD_GET_BATTERY_STATE = b'\xff\x03\x01\x00\x00\x07'
# > ff 03 0e 00 48 00 7e 00 1d 0e 0d 00 7e 00 1c 00 03
# CC 11 11 22 22 33 33 44 55 66 66 77 77 88 88
# 1: Battery charge: 72 %
# 2: Battery voltage: 12.6 V
# 3: Battery current: 0.29 A
# 4: Internal temperature?
# 5: External temperature probe for battery signed 8bit: 13 degC
# 6: Load voltage: 12.6 V
# 7: Load current: 0.28 A
# 8: Load power: 3 W
# CMD_GET_PANEL_STATUS = b'\xff\x03\x01\x07\x00\x04'
# > ff 03 08 00 c8 00 14 00 04 00 01
# CC 11 11 22 22 33 33 ?? ??
# > ff 03 08 00 00 00 00 00 00 00 00
# 1: Panel voltage: 20.0 V
# 2: Panel current: 0.20 A
# 3: Panel power: 4 W
# ?: load_enabled
# Only factor of 1000
SI_PREFIXES_LARGE = "kMGTPEZY"
SI_PREFIXES_SMALL = "mµnpfazy"
def humanize_number(data, unit: str = ""):
counter = 0
while data >= 1000:
data /= 1000
counter += 1
if counter >= len(SI_PREFIXES_LARGE):
break
while data < 1:
data *= 1000
counter -= 1
if abs(counter) >= len(SI_PREFIXES_SMALL):
break
if not counter:
prefix = ""
elif counter > 0:
prefix = SI_PREFIXES_LARGE[counter - 1]
elif counter < 0:
prefix = SI_PREFIXES_SMALL[abs(counter) - 1]
return f"{data:.3g} {prefix}{unit}"
def parse(data: bytes, items: Collection[DataItem], offset: int = 0) -> dict:
pos = offset
res = {}
for i in items:
res[i.name] = i.transform(struct.unpack_from(i.st_format, data, offset=pos)[0])
pos += i.st_size
return res
# GET_BATTERY_STATE
def parse_battery_state(data: bytes) -> dict:
return parse(data, DATA_BATTERY_STATE)
def parse_historical_entry(data: bytes) -> dict:
res = parse(data, HISTORICAL_DATA[:10])
res_datalen = sum([x.st_size for x in HISTORICAL_DATA[:10]])
if len(data) > res_datalen:
res.update(parse(data, HISTORICAL_DATA[10:], offset=res_datalen))
return res
def write(fh, data):
bdata = bytes(data)
crc = modbus(bdata)
bcrc = bytes([crc & 0xFF, (crc & 0xFF00) >> 8])
fh.write(data + bcrc)
def construct_request(address, words=1, action=ACTION_READ, marker=0xFF):
assert marker in POSSIBLE_MARKER, f"marker should be one of {POSSIBLE_MARKER}"
return struct.pack("!BBHH", marker, action, address, words)
def log(*message: object, **kwargs):
print(datetime.datetime.utcnow().isoformat(" "), *message, **kwargs)
sys.stdout.flush()
def parse_packet(data):
tag, operation, size = struct.unpack_from("BBB", data)
_unpacked = struct.unpack_from(f"<{size}BH", data, offset=3)
crc = _unpacked[-1]
payload = _unpacked[:-1]
calculated_crc = modbus(bytes([tag, operation, size, *payload]))
if crc != calculated_crc:
e = ValueError(f"CRC missmatch: expected {crc:04X}, got {calculated_crc:04X}.")
e.tag = tag
e.operation = operation
e.size = size
e.payload = payload
e.crc = crc
e.calculated_crc = calculated_crc
raise e
return payload
def discardUntil(fh: RawIOBase, byte: int, timeout=10) -> Optional[int]:
assert byte >= 0 and byte < 256, f"byte: Expected 8bit unsigned int, got {byte}"
def expand(b: Optional[bytes]):
if b is None:
return b
return b[0]
start = time.time()
discarded = 0
read_byte = expand(fh.read(1))
while read_byte != byte:
if read_byte is not None:
if not discarded:
log("Discarding", end="")
discarded += 1
print(f" {read_byte:02X}", end="")
sys.stdout.flush()
if time.time() - start > timeout:
read_byte = None
break
read_byte = expand(fh.read(1))
if discarded:
print()
sys.stdout.flush()
return read_byte
def readMemory(fh: RawIOBase, address: int, words: int = 1) -> Optional[bytes]:
# log(f"Reading {words} words from 0x{address:04X}")
request = construct_request(address, words=words)
# log("Request:", request)
write(fh, request)
tag = discardUntil(fh, 0xFF)
if tag is None:
return None
header = fh.read(2)
if header and len(header) == 2:
operation, size = header
data = fh.read(size)
_crc = fh.read(2)
if data and _crc:
try:
crc = struct.unpack_from("<H", _crc)[0]
except struct.error:
log(f"readMemory: CRC error; read {len(_crc)} bytes (2 expected)")
return None
calculated_crc = modbus(bytes([tag, operation, size, *data]))
if crc == calculated_crc:
return data
else:
log(f"readMemory: CRC error; {crc:04X} != {calculated_crc:04X}")
log("data or crc is falsely", header, data, _crc)
return None
class Periodical:
prev: float
interval: float
def __init__(self, interval: float, start: Optional[float] = None):
self.prev = time.time() - interval if start is None else start
self.interval = interval
def __call__(self, now: Optional[float] = None) -> bool:
if now is None:
now = time.time()
if (now - self.prev) >= self.interval:
skipped, overshoot = divmod(now - self.prev, self.interval)
skipped -= 1
if skipped:
log("Skipped:", skipped, overshoot, now - self.prev, self.interval)
self.prev = now - overshoot
return True
return False
def try_read_parse(
dev: BTLEUart,
address: int,
words: int = 1,
parser: Callable = None,
attempts=5,
) -> Optional[dict]:
while attempts:
attempts -= 1
res = readMemory(dev, address, words)
if res:
try:
if parser:
return parser(res)
except struct.error as e:
log(e)
log("0x0100 Unpack error:", len(res), res)
log("Flushed from read buffer; ", dev.read(timeout=0.5))
else:
log(f"No data read, expected {words*2} bytes (attempts left: {attempts})")
return None
if __name__ == "__main__":
conf = get_config()
consumers = get_consumers(conf)
per_voltages = Periodical(interval=15)
per_current_hist = Periodical(interval=60)
try:
while True:
try:
log("Connecting...")
with BTLEUart(MAC, timeout=5) as dev:
log("Connected.")
# write(dev, construct_request(0, 32))
# Memory dump
# for address in range(0, 0x10000, 16):
# log(f"Reading 0x{address:04X}...")
# write(wd, construct_request(address, 16))
days = 7
res = try_read_parse(dev, 0x010B, 21, parse_historical_entry)
if res:
log(res)
for consumer in consumers:
consumer.write(res)
days = cast(int, res.get("run_days", 7))
for i in range(days):
res = try_read_parse(
dev, 0xF000 + i, 10, parse_historical_entry
)
if res:
log({i: res})
for consumer in consumers:
consumer.write({str(i): res})
while True:
now = time.time()
if per_voltages(now):
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
if data:
data[DataName.CALCULATED_BATTERY_POWER] = float(
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
* Decimal(
str(data.get(DataName.BATTERY_CURRENT, 0))
)
)
data[DataName.CALCULATED_PANEL_POWER] = float(
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
)
data[DataName.CALCULATED_LOAD_POWER] = float(
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
)
log(data)
for consumer in consumers:
consumer.write(data)
if per_current_hist(now):
data = try_read_parse(
dev, 0x010B, 21, parse_historical_entry
)
if data:
log(data)
for consumer in consumers:
consumer.write(data)
# print(".")
for consumer in consumers:
consumer.poll()
time.sleep(max(0, 1 - time.time() - now))
# if STATUS.get('load_enabled'):
# write(wd, CMD_DISABLE_LOAD)
# else:
# write(wd, CMD_ENABLE_LOAD)
except btle.BTLEDisconnectError:
log("ERROR: Disconnected")
time.sleep(1)
except (KeyboardInterrupt, SystemExit, Exception) as e:
for consumer in consumers:
consumer.exit()
if type(e) is not KeyboardInterrupt:
raise

118
srnemqtt/solar_types.py Normal file
View file

@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
import struct
from enum import Enum, unique
from typing import Callable, Optional
@unique
class DataName(str, Enum):
BATTERY_CHARGE = "battery_charge"
BATTERY_VOLTAGE = "battery_voltage"
BATTERY_CURRENT = "battery_current"
INTERNAL_TEMPERATURE = "internal_temperature"
BATTERY_TEMPERATURE = "battery_temperature"
LOAD_VOLTAGE = "load_voltage"
LOAD_CURRENT = "load_current"
LOAD_POWER = "load_power"
PANEL_VOLTAGE = "panel_voltage"
PANEL_CURRENT = "panel_current"
PANEL_POWER = "panel_power"
LOAD_ENABLED = "load_enabled"
BATTERY_VOLTAGE_MIN = "battery_voltage_min"
BATTERY_VOLTAGE_MAX = "battery_voltage_max"
CHARGE_MAX_CURRENT = "charge_max_current"
_DISCHARGE_MAX_CURRENT = "_discharge_max_current?"
CHARGE_MAX_POWER = "charge_max_power"
DISCHARGE_MAX_POWER = "discharge_max_power"
CHARGE_AMP_HOUR = "charge_amp_hour"
DISCHARGE_AMP_HOUR = "discharge_amp_hour"
PRODUCTION_ENERGY = "production_energy"
CONSUMPTION_ENERGY = "consumption_energy"
RUN_DAYS = "run_days"
DISCHARGE_COUNT = "discharge_count"
FULL_CHARGE_COUNT = "full_charge_count"
TOTAL_CHARGE_AMP_HOURS = "total_charge_amp_hours"
TOTAL_DISCHARGE_AMP_HOURS = "total_discharge_amp_hours"
TOTAL_PRODUCTION_ENERGY = "total_production_energy"
TOTAL_CONSUMPTION_ENERGY = "total_consumption_energy"
CALCULATED_BATTERY_POWER = "calculated_battery_power"
CALCULATED_PANEL_POWER = "calculated_panel_power"
CALCULATED_LOAD_POWER = "calculated_load_power"
def __repr__(self):
return repr(self.value)
class DataItem:
name: DataName
st_format: str
unit: Optional[str]
transformation: Optional[Callable]
def __init__(
self,
name: DataName,
st_format: str,
unit: Optional[str] = None,
transform: Optional[Callable] = None,
):
self.name = name
self.st_format = st_format
self.unit = unit
self.transformation = transform
if self.st_format[0] not in "@=<>!":
self.st_format = "!" + self.st_format
@property
def st_size(self) -> int:
return struct.calcsize(self.st_format)
def transform(self, data):
if self.transformation is None:
return data
return self.transformation(data)
def parse_temperature(bin):
if bin & 0x80:
return (bin & 0x7F) * -1
return bin & 0x7F
DATA_BATTERY_STATE = [
DataItem(DataName.BATTERY_CHARGE, "H", "%"),
DataItem(DataName.BATTERY_VOLTAGE, "H", "V", lambda n: n / 10),
DataItem(DataName.BATTERY_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.INTERNAL_TEMPERATURE, "B", "°C", parse_temperature),
DataItem(DataName.BATTERY_TEMPERATURE, "B", "°C", parse_temperature),
DataItem(DataName.LOAD_VOLTAGE, "H", "V", lambda n: n / 10),
DataItem(DataName.LOAD_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.LOAD_POWER, "H", "W"),
DataItem(DataName.PANEL_VOLTAGE, "H", "V", lambda n: n / 10),
DataItem(DataName.PANEL_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.PANEL_POWER, "H", "W"),
DataItem(DataName.LOAD_ENABLED, "x?", transform=bool),
]
HISTORICAL_DATA = [
DataItem(DataName.BATTERY_VOLTAGE_MIN, "H", "V", lambda n: n / 10),
DataItem(DataName.BATTERY_VOLTAGE_MAX, "H", "V", lambda n: n / 10),
DataItem(DataName.CHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName._DISCHARGE_MAX_CURRENT, "H", "A", lambda n: n / 100),
DataItem(DataName.CHARGE_MAX_POWER, "H", "W"),
DataItem(DataName.DISCHARGE_MAX_POWER, "H", "W"),
DataItem(DataName.CHARGE_AMP_HOUR, "H", "Ah"),
DataItem(DataName.DISCHARGE_AMP_HOUR, "H", "Ah"),
DataItem(DataName.PRODUCTION_ENERGY, "H", "Wh"),
DataItem(DataName.CONSUMPTION_ENERGY, "H", "Wh"),
DataItem(DataName.RUN_DAYS, "H"),
DataItem(DataName.DISCHARGE_COUNT, "H"),
DataItem(DataName.FULL_CHARGE_COUNT, "H"),
DataItem(DataName.TOTAL_CHARGE_AMP_HOURS, "L", "Ah"),
DataItem(DataName.TOTAL_DISCHARGE_AMP_HOURS, "L", "Ah"),
DataItem(DataName.TOTAL_PRODUCTION_ENERGY, "L", "Wh"),
DataItem(DataName.TOTAL_CONSUMPTION_ENERGY, "L", "Wh"),
]

72
srnemqtt/test_config.py Normal file
View file

@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
import importlib
import os
from time import sleep
from typing import Any, Dict, List, Optional, Type
import yaml
from consumers import BaseConsumer
def get_consumer(name: str) -> Optional[Type[BaseConsumer]]:
mod_name, cls_name = name.rsplit(".", 1)
mod = importlib.import_module(f"consumers.{mod_name}")
# print(mod)
# print(dir(mod))
res = getattr(mod, cls_name)
assert issubclass(res, BaseConsumer)
return res
def get_config() -> Dict[str, Any]:
with open("config.yaml", "r") as fh:
conf: dict = yaml.safe_load(fh)
conf.setdefault("consumers", {})
return conf
def write_config(conf: Dict[str, Any]):
with open(".config.yaml~writing", "w") as fh:
yaml.safe_dump(conf, fh, indent=2, encoding="utf-8")
os.rename(".config.yaml~writing", "config.yaml")
def get_consumers(conf: Optional[Dict[str, Any]] = None) -> List[BaseConsumer]:
if conf is None:
conf = get_config()
consumers = []
for name, consumer_config in conf["consumers"].items():
# print(name, consumer_config)
mod = get_consumer(name)
if mod:
# print(mod)
consumers.append(mod(consumer_config))
write_config(conf)
return consumers
if __name__ == "__main__":
conf = get_config()
consumers = get_consumers(conf)
try:
while True:
for consumer in consumers:
consumer.poll()
sleep(1)
except (KeyboardInterrupt, SystemExit, Exception) as e:
for consumer in consumers:
consumer.exit()
if type(e) is not KeyboardInterrupt:
raise
write_config(conf)