srne-mqtt/test_config.py

65 lines
1.4 KiB
Python
Raw Normal View History

2021-11-14 00:55:43 +00:00
# -*- coding: utf-8 -*-
import importlib
import os
from time import sleep
from typing import Any, Dict, Optional, Type
import yaml
from consumers import BaseConsumer
def get_consumer(name: str) -> Optional[Type[BaseConsumer]]:
mod_name, cls_name = name.rsplit(".", 1)
mod = importlib.import_module(f"consumers.{mod_name}")
print(mod)
print(dir(mod))
res = getattr(mod, cls_name)
assert issubclass(res, BaseConsumer)
return res
def get_config() -> Dict[str, Any]:
with open("config.yaml", "r") as fh:
conf: dict = yaml.safe_load(fh)
conf.setdefault("consumers", {})
return conf
def write_config(conf: Dict[str, Any]):
with open(".config.yaml~writing", "w") as fh:
yaml.safe_dump(conf, fh, indent=2, encoding="utf-8")
os.rename(".config.yaml~writing", "config.yaml")
conf = get_config()
consumers = []
for name, consumer_config in conf["consumers"].items():
print(name, consumer_config)
mod = get_consumer(name)
if mod:
print(mod)
consumers.append(mod(consumer_config))
write_config(conf)
try:
while True:
for consumer in consumers:
consumer.poll()
sleep(1)
except (KeyboardInterrupt, SystemExit, Exception) as e:
for consumer in consumers:
consumer.exit()
if type(e) is not KeyboardInterrupt:
raise
write_config(conf)