srne-mqtt/srnemqtt/config.py

98 lines
2.3 KiB
Python
Raw Normal View History

2021-11-14 00:55:43 +00:00
# -*- coding: utf-8 -*-
import importlib
import os
from time import sleep
2021-11-14 03:46:17 +00:00
from typing import Any, Dict, List, Optional, Type
2021-11-14 00:55:43 +00:00
import yaml
2023-01-07 17:24:41 +00:00
from .consumers import BaseConsumer
2023-12-09 15:35:45 +00:00
from .interfaces import BaseInterface
2021-11-14 00:55:43 +00:00
def get_consumer(name: str) -> Optional[Type[BaseConsumer]]:
mod_name, cls_name = name.rsplit(".", 1)
mod = importlib.import_module(f".consumers.{mod_name}", package=__package__)
2021-11-14 00:55:43 +00:00
2021-11-14 03:46:17 +00:00
# print(mod)
# print(dir(mod))
2021-11-14 00:55:43 +00:00
res = getattr(mod, cls_name)
assert issubclass(res, BaseConsumer)
return res
def get_config() -> Dict[str, Any]:
with open("config.yaml", "r") as fh:
conf: dict = yaml.safe_load(fh)
conf.setdefault("consumers", {})
return conf
def write_config(conf: Dict[str, Any]):
with open(".config.yaml~writing", "w") as fh:
yaml.safe_dump(conf, fh, indent=2, encoding="utf-8")
os.rename(".config.yaml~writing", "config.yaml")
2021-11-14 03:46:17 +00:00
def get_consumers(conf: Optional[Dict[str, Any]] = None) -> List[BaseConsumer]:
if conf is None:
conf = get_config()
2021-11-14 00:55:43 +00:00
2021-11-14 03:46:17 +00:00
consumers = []
for name, consumer_config in conf["consumers"].items():
# print(name, consumer_config)
mod = get_consumer(name)
if mod:
# print(mod)
consumers.append(mod(consumer_config))
write_config(conf)
return consumers
2021-11-14 00:55:43 +00:00
2023-04-07 21:57:37 +00:00
def _get_interface(name: str) -> Type[BaseInterface]:
mod_name, cls_name = name.rsplit(".", 1)
2023-04-07 22:22:14 +00:00
mod = importlib.import_module(f".interfaces.{mod_name}", package=__package__)
2023-04-07 21:57:37 +00:00
res = getattr(mod, cls_name)
2023-04-07 22:22:14 +00:00
assert issubclass(res, BaseInterface)
2023-04-07 21:57:37 +00:00
return res
def get_interface(conf: Optional[Dict[str, Any]] = None) -> BaseInterface:
if conf is None:
conf = get_config()
2023-04-07 21:57:37 +00:00
name = conf["interface"]["name"]
params = conf["interface"].get("params", {})
2023-04-07 21:57:37 +00:00
mod = _get_interface(name)
assert mod
return mod(**params)
2021-11-14 03:46:17 +00:00
if __name__ == "__main__":
conf = get_config()
2021-11-14 00:55:43 +00:00
2021-11-14 03:46:17 +00:00
consumers = get_consumers(conf)
try:
while True:
for consumer in consumers:
consumer.poll()
sleep(1)
except (KeyboardInterrupt, SystemExit, Exception) as e:
2021-11-14 00:55:43 +00:00
for consumer in consumers:
2021-11-14 03:46:17 +00:00
consumer.exit()
2021-11-14 00:55:43 +00:00
2021-11-14 03:46:17 +00:00
if type(e) is not KeyboardInterrupt:
raise
2021-11-14 00:55:43 +00:00
2021-11-14 03:46:17 +00:00
write_config(conf)