srne-mqtt/srnemqtt/config.py

101 lines
2.4 KiB
Python

# -*- coding: utf-8 -*-
import importlib
import os
from time import sleep
from typing import Any, Dict, List, Optional, Type
import yaml
from .consumers import BaseConsumer
from .interfaces import BaseInterface
from .srne import Srne
def get_consumer(name: str) -> Optional[Type[BaseConsumer]]:
mod_name, cls_name = name.rsplit(".", 1)
mod = importlib.import_module(f".consumers.{mod_name}", package=__package__)
# print(mod)
# print(dir(mod))
res = getattr(mod, cls_name)
assert issubclass(res, BaseConsumer)
return res
def get_config() -> Dict[str, Any]:
with open("config.yaml", "r") as fh:
conf: dict = yaml.safe_load(fh)
conf.setdefault("consumers", {})
return conf
def write_config(conf: Dict[str, Any]):
with open(".config.yaml~writing", "w") as fh:
yaml.safe_dump(conf, fh, indent=2, encoding="utf-8")
os.rename(".config.yaml~writing", "config.yaml")
def get_consumers(
srne: Srne, conf: Optional[Dict[str, Any]] = None
) -> List[BaseConsumer]:
if conf is None:
conf = get_config()
consumers = []
for name, consumer_config in conf["consumers"].items():
# print(name, consumer_config)
mod = get_consumer(name)
if mod:
# print(mod)
consumers.append(mod(settings=consumer_config, srne=srne))
write_config(conf)
return consumers
def _get_interface(name: str) -> Type[BaseInterface]:
mod_name, cls_name = name.rsplit(".", 1)
mod = importlib.import_module(f".interfaces.{mod_name}", package=__package__)
res = getattr(mod, cls_name)
assert issubclass(res, BaseInterface)
return res
def get_interface(conf: Optional[Dict[str, Any]] = None) -> BaseInterface:
if conf is None:
conf = get_config()
name = conf["interface"]["name"]
params = conf["interface"].get("params", {})
mod = _get_interface(name)
assert mod
return mod(**params)
if __name__ == "__main__":
conf = get_config()
consumers = get_consumers(Srne(BaseInterface()), conf)
try:
while True:
for consumer in consumers:
consumer.poll()
sleep(1)
except (KeyboardInterrupt, SystemExit, Exception) as e:
for consumer in consumers:
consumer.exit()
if type(e) is not KeyboardInterrupt:
raise
write_config(conf)