Move main executable code to __main__

This commit is contained in:
Odd Stråbø 2023-01-07 19:02:34 +01:00
parent f7386d31d2
commit 59fc5e4fa9
2 changed files with 120 additions and 99 deletions

View File

@ -5,16 +5,13 @@ import datetime
import struct
import sys
import time
from decimal import Decimal
from io import RawIOBase
from typing import Callable, Collection, Optional, cast
from typing import Callable, Collection, Optional
from bluepy import btle
from libscrc import modbus
from .config import get_config, get_consumers
from .feasycom_ble import BTLEUart
from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, DataItem, DataName
from .solar_types import DATA_BATTERY_STATE, HISTORICAL_DATA, DataItem
MAC = "DC:0D:30:9C:61:BA"
@ -346,7 +343,7 @@ def try_read_parse(
dev: BTLEUart,
address: int,
words: int = 1,
parser: Callable = None,
parser: Optional[Callable] = None,
attempts=5,
) -> Optional[dict]:
while attempts:
@ -363,96 +360,3 @@ def try_read_parse(
else:
log(f"No data read, expected {words*2} bytes (attempts left: {attempts})")
return None
if __name__ == "__main__":
conf = get_config()
consumers = get_consumers(conf)
per_voltages = Periodical(interval=15)
per_current_hist = Periodical(interval=60)
try:
while True:
try:
log("Connecting...")
with BTLEUart(MAC, timeout=5) as dev:
log("Connected.")
# write(dev, construct_request(0, 32))
# Memory dump
# for address in range(0, 0x10000, 16):
# log(f"Reading 0x{address:04X}...")
# write(wd, construct_request(address, 16))
days = 7
res = try_read_parse(dev, 0x010B, 21, parse_historical_entry)
if res:
log(res)
for consumer in consumers:
consumer.write(res)
days = cast(int, res.get("run_days", 7))
for i in range(days):
res = try_read_parse(
dev, 0xF000 + i, 10, parse_historical_entry
)
if res:
log({i: res})
for consumer in consumers:
consumer.write({str(i): res})
while True:
now = time.time()
if per_voltages(now):
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
if data:
data[DataName.CALCULATED_BATTERY_POWER] = float(
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
* Decimal(
str(data.get(DataName.BATTERY_CURRENT, 0))
)
)
data[DataName.CALCULATED_PANEL_POWER] = float(
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
)
data[DataName.CALCULATED_LOAD_POWER] = float(
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
)
log(data)
for consumer in consumers:
consumer.write(data)
if per_current_hist(now):
data = try_read_parse(
dev, 0x010B, 21, parse_historical_entry
)
if data:
log(data)
for consumer in consumers:
consumer.write(data)
# print(".")
for consumer in consumers:
consumer.poll()
time.sleep(max(0, 1 - time.time() - now))
# if STATUS.get('load_enabled'):
# write(wd, CMD_DISABLE_LOAD)
# else:
# write(wd, CMD_ENABLE_LOAD)
except btle.BTLEDisconnectError:
log("ERROR: Disconnected")
time.sleep(1)
except (KeyboardInterrupt, SystemExit, Exception) as e:
for consumer in consumers:
consumer.exit()
if type(e) is not KeyboardInterrupt:
raise

117
srnemqtt/__main__.py Executable file
View File

@ -0,0 +1,117 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from decimal import Decimal
from typing import cast
from bluepy import btle
from . import (
MAC,
Periodical,
log,
parse_battery_state,
parse_historical_entry,
try_read_parse,
)
from .config import get_config, get_consumers
from .feasycom_ble import BTLEUart
from .solar_types import DataName
def main():
conf = get_config()
consumers = get_consumers(conf)
per_voltages = Periodical(interval=15)
per_current_hist = Periodical(interval=60)
try:
while True:
try:
log("Connecting...")
with BTLEUart(MAC, timeout=5) as dev:
log("Connected.")
# write(dev, construct_request(0, 32))
# Memory dump
# for address in range(0, 0x10000, 16):
# log(f"Reading 0x{address:04X}...")
# write(wd, construct_request(address, 16))
days = 7
res = try_read_parse(dev, 0x010B, 21, parse_historical_entry)
if res:
log(res)
for consumer in consumers:
consumer.write(res)
days = cast(int, res.get("run_days", 7))
for i in range(days):
res = try_read_parse(
dev, 0xF000 + i, 10, parse_historical_entry
)
if res:
log({i: res})
for consumer in consumers:
consumer.write({str(i): res})
while True:
now = time.time()
if per_voltages(now):
data = try_read_parse(dev, 0x0100, 11, parse_battery_state)
if data:
data[DataName.CALCULATED_BATTERY_POWER] = float(
Decimal(str(data.get(DataName.BATTERY_VOLTAGE, 0)))
* Decimal(
str(data.get(DataName.BATTERY_CURRENT, 0))
)
)
data[DataName.CALCULATED_PANEL_POWER] = float(
Decimal(str(data.get(DataName.PANEL_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.PANEL_CURRENT, 0)))
)
data[DataName.CALCULATED_LOAD_POWER] = float(
Decimal(str(data.get(DataName.LOAD_VOLTAGE, 0)))
* Decimal(str(data.get(DataName.LOAD_CURRENT, 0)))
)
log(data)
for consumer in consumers:
consumer.write(data)
if per_current_hist(now):
data = try_read_parse(
dev, 0x010B, 21, parse_historical_entry
)
if data:
log(data)
for consumer in consumers:
consumer.write(data)
# print(".")
for consumer in consumers:
consumer.poll()
time.sleep(max(0, 1 - time.time() - now))
# if STATUS.get('load_enabled'):
# write(wd, CMD_DISABLE_LOAD)
# else:
# write(wd, CMD_ENABLE_LOAD)
except btle.BTLEDisconnectError:
log("ERROR: Disconnected")
time.sleep(1)
except (KeyboardInterrupt, SystemExit, Exception) as e:
for consumer in consumers:
consumer.exit()
if type(e) is not KeyboardInterrupt:
raise
if __name__ == "__main__":
main()