#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time from typing import List, Optional, cast from bluepy.btle import BTLEDisconnectError from serial import SerialException from srnemqtt.consumers import BaseConsumer from .config import get_config, get_consumers, get_interface from .srne import Srne from .util import Periodical, log class CommunicationError(BTLEDisconnectError, SerialException, TimeoutError): pass def main() -> None: conf = get_config() consumers: Optional[List[BaseConsumer]] = None per_voltages = Periodical(interval=15) per_current_hist = Periodical(interval=60) try: while True: try: log("Connecting...") with get_interface() as dev: srne = Srne(dev) log("Connected.") if consumers is None: consumers = get_consumers(srne, conf) days = 7 res = srne.get_historical_entry() if res: log(res) for consumer in consumers: consumer.write(res) days = cast(int, res.get("run_days", 7)) for i in range(days): res = srne.get_historical_entry(i) if res: log({i: res}) for consumer in consumers: consumer.write({str(i): res}) while True: now = time.time() if per_voltages(now): data = srne.get_battery_state() if data: log(data) for consumer in consumers: consumer.write(data) if per_current_hist(now): try: data = srne.get_historical_entry() log(data) for consumer in consumers: consumer.write(data) except TimeoutError: pass # print(".") for consumer in consumers: consumer.poll() time.sleep(max(0, 1 - time.time() - now)) # 1s loop # if STATUS.get('load_enabled'): # write(wd, CMD_DISABLE_LOAD) # else: # write(wd, CMD_ENABLE_LOAD) except CommunicationError: log("ERROR: Disconnected") time.sleep(1) except (KeyboardInterrupt, SystemExit, Exception) as e: if consumers is not None: for consumer in consumers: consumer.exit() if type(e) is not KeyboardInterrupt: raise if __name__ == "__main__": main()