srne-mqtt/srnemqtt/__main__.py

118 lines
4.2 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from decimal import Decimal
from typing import cast
from bluepy.btle import BTLEDisconnectError
from serial import SerialException
from .config import get_config, get_consumers, get_interface
from .protocol import parse_battery_state, parse_historical_entry, try_read_parse
from .solar_types import DataName
from .util import Periodical, log
class CommunicationError(BTLEDisconnectError, SerialException, IOError):
pass
def main():
conf = get_config()
consumers = get_consumers(conf)
per_voltages = Periodical(interval=15)
per_current_hist = Periodical(interval=60)
# import serial
# ser = serial.Serial()
try:
while True:
try:
log("Connecting...")
with get_interface() as dev:
log("Connected.")
# write(dev, construct_request(0, 32))
# Memory dump
# for address in range(0, 0x10000, 16):
# log(f"Reading 0x{address:04X}...")
# write(wd, construct_request(address, 16))
days = 7
res = try_read_parse(dev, 0x010B, 21, parse_historical_entry)
if res:
log(res)
for consumer in consumers:
consumer.write(res)
days = cast(int, res.get("run_days", 7))
for i in range(days):
res = try_read_parse(
dev, 0xF000 + i, 10, parse_historical_entry
)
if res:
log({i: res})
for consumer in consumers:
consumer.write({str(i): res})
while True:
now = time.time()
if per_voltages(now):
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
if data:
data[DataName.CALCULATED_BATTERY_POWER] = float(
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
* Decimal(
str(data.get(DataName.BATTERY_CURRENT, 0))
)
)
data[DataName.CALCULATED_PANEL_POWER] = float(
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
)
data[DataName.CALCULATED_LOAD_POWER] = float(
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
)
log(data)
for consumer in consumers:
consumer.write(data)
if per_current_hist(now):
data = try_read_parse(
dev, 0x010B, 21, parse_historical_entry
)
if data:
log(data)
for consumer in consumers:
consumer.write(data)
# print(".")
for consumer in consumers:
consumer.poll()
time.sleep(max(0, 1 - time.time() - now))
# if STATUS.get('load_enabled'):
# write(wd, CMD_DISABLE_LOAD)
# else:
# write(wd, CMD_ENABLE_LOAD)
except CommunicationError:
log("ERROR: Disconnected")
time.sleep(1)
except (KeyboardInterrupt, SystemExit, Exception) as e:
for consumer in consumers:
consumer.exit()
if type(e) is not KeyboardInterrupt:
raise
if __name__ == "__main__":
main()