#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time from bluepy.btle import BTLEDisconnectError # type: ignore from serial import SerialException # type: ignore from .config import get_config, get_consumers, get_interface from .protocol import ChargeController from .util import Periodical, log class CommunicationError(BTLEDisconnectError, SerialException, IOError): pass def main(): conf = get_config() consumers = get_consumers(conf) per_voltages = Periodical(interval=15) per_current_hist = Periodical(interval=60) # import serial # ser = serial.Serial() try: while True: try: log("Connecting...") with get_interface() as dev: log("Connected.") cc = ChargeController(dev) log(f"Controller model: {cc.model}") log(f"Controller version: {cc.version}") log(f"Controller serial: {cc.serial}") for consumer in consumers: consumer.controller = cc # write(dev, construct_request(0, 32)) # Memory dump # for address in range(0, 0x10000, 16): # log(f"Reading 0x{address:04X}...") # write(wd, construct_request(address, 16)) extra = cc.extra days = extra.run_days res = cc.today.as_dict() res.update(extra.as_dict()) for consumer in consumers: consumer.write(res) del extra for i in range(days): hist = cc.get_historical(i) res = hist.as_dict() log({i: res}) for consumer in consumers: consumer.write({str(i): res}) while True: now = time.time() if per_voltages(now): data = cc.state.as_dict() log(data) for consumer in consumers: consumer.write(data) if per_current_hist(now): data = cc.today.as_dict() data.update(cc.extra.as_dict()) log(data) for consumer in consumers: consumer.write(data) # print(".") for consumer in consumers: consumer.poll() time.sleep(max(0, 1 - (time.time() - now))) # if STATUS.get('load_enabled'): # write(wd, CMD_DISABLE_LOAD) # else: # write(wd, CMD_ENABLE_LOAD) except CommunicationError: log("ERROR: Disconnected") time.sleep(1) except (KeyboardInterrupt, SystemExit, Exception) as e: for consumer in consumers: consumer.exit() if type(e) is not KeyboardInterrupt: raise if __name__ == "__main__": main()