srne-mqtt/srnemqtt/config.py

123 lines
3.1 KiB
Python

# -*- coding: utf-8 -*-
import importlib
import os
from time import sleep
from typing import Any, Dict, List, Optional, Type
import yaml
from .consumers import BaseConsumer
from .interfaces import BaseInterface
def get_consumer(name: str) -> Optional[Type[BaseConsumer]]:
mod_name, cls_name = name.rsplit(".", 1)
mod = importlib.import_module(f".consumers.{mod_name}", package=__package__)
# print(mod)
# print(dir(mod))
res = getattr(mod, cls_name)
assert issubclass(res, BaseConsumer)
return res
def get_config() -> Dict[str, Any]:
with open("config.yaml", "r") as fh:
conf: dict = yaml.safe_load(fh)
conf.setdefault("consumers", {})
logging = conf.setdefault("logging", {})
logging.setdefault("version", 1)
logging.setdefault("disable_existing_loggers", False)
logging.setdefault(
"handlers",
{
"console": {
"class": "logging.StreamHandler",
"formatter": "default",
"level": "INFO",
"stream": "ext://sys.stdout",
}
},
)
logging.setdefault(
"formatters",
{
"format": "%(asctime)s %(levelname)-8s %(name)-15s %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
},
)
loggers = logging.setdefault("loggers", {})
loggers.setdefault("root", {"handlers": ["console"], "level": "DEBUG"})
return conf
def write_config(conf: Dict[str, Any]):
with open(".config.yaml~writing", "w") as fh:
yaml.safe_dump(conf, fh, indent=2, encoding="utf-8")
fh.flush()
os.fsync(fh.fileno())
os.replace(".config.yaml~writing", "config.yaml")
def get_consumers(conf: Optional[Dict[str, Any]] = None) -> List[BaseConsumer]:
if conf is None:
conf = get_config()
consumers = []
for name, consumer_config in conf["consumers"].items():
# print(name, consumer_config)
mod = get_consumer(name)
if mod:
# print(mod)
consumers.append(mod(consumer_config))
write_config(conf)
return consumers
def _get_interface(name: str) -> Type[BaseInterface]:
mod_name, cls_name = name.rsplit(".", 1)
mod = importlib.import_module(f".interfaces.{mod_name}", package=__package__)
res = getattr(mod, cls_name)
assert issubclass(res, BaseInterface)
return res
def get_interface(conf: Optional[Dict[str, Any]] = None) -> BaseInterface:
if conf is None:
conf = get_config()
name = conf["interface"]["name"]
params = conf["interface"].get("params", {})
mod = _get_interface(name)
assert mod
return mod(**params)
if __name__ == "__main__":
conf = get_config()
consumers = get_consumers(conf)
try:
while True:
for consumer in consumers:
consumer.poll()
sleep(1)
except (KeyboardInterrupt, SystemExit, Exception) as e:
for consumer in consumers:
consumer.exit()
if type(e) is not KeyboardInterrupt:
raise
write_config(conf)