#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time from decimal import Decimal from typing import cast from bluepy.btle import BTLEDisconnectError from serial import SerialException from .config import get_config, get_consumers from .constants import MAC from .lib.feasycom_ble import BTLEUart from .protocol import parse_battery_state, parse_historical_entry, try_read_parse from .solar_types import DataName from .util import Periodical, log class CommunicationError(BTLEDisconnectError, SerialException): pass def main(): conf = get_config() consumers = get_consumers(conf) per_voltages = Periodical(interval=15) per_current_hist = Periodical(interval=60) # import serial # ser = serial.Serial() try: while True: try: log("Connecting...") with BTLEUart(MAC, timeout=5) as dev: log("Connected.") # write(dev, construct_request(0, 32)) # Memory dump # for address in range(0, 0x10000, 16): # log(f"Reading 0x{address:04X}...") # write(wd, construct_request(address, 16)) days = 7 res = try_read_parse(dev, 0x010B, 21, parse_historical_entry) if res: log(res) for consumer in consumers: consumer.write(res) days = cast(int, res.get("run_days", 7)) for i in range(days): res = try_read_parse( dev, 0xF000 + i, 10, parse_historical_entry ) if res: log({i: res}) for consumer in consumers: consumer.write({str(i): res}) while True: now = time.time() if per_voltages(now): data = try_read_parse(dev, 0x0100, 11, parse_battery_state) if data: data[DataName.CALCULATED_BATTERY_POWER] = float( Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0))) * Decimal( str(data.get(DataName.BATTERY_CURRENT, 0)) ) ) data[DataName.CALCULATED_PANEL_POWER] = float( Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0))) * Decimal(str(data.get(DataName.PANEL_CURRENT, 0))) ) data[DataName.CALCULATED_LOAD_POWER] = float( Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0))) * Decimal(str(data.get(DataName.LOAD_CURRENT, 0))) ) log(data) for consumer in consumers: consumer.write(data) if per_current_hist(now): data = try_read_parse( dev, 0x010B, 21, parse_historical_entry ) if data: log(data) for consumer in consumers: consumer.write(data) # print(".") for consumer in consumers: consumer.poll() time.sleep(max(0, 1 - time.time() - now)) # if STATUS.get('load_enabled'): # write(wd, CMD_DISABLE_LOAD) # else: # write(wd, CMD_ENABLE_LOAD) except CommunicationError: log("ERROR: Disconnected") time.sleep(1) except (KeyboardInterrupt, SystemExit, Exception) as e: for consumer in consumers: consumer.exit() if type(e) is not KeyboardInterrupt: raise if __name__ == "__main__": main()