# -*- coding: utf-8 -*- import importlib import os from time import sleep from typing import Any, Dict, List, Optional, Type import yaml from srnemqtt.interfaces import BaseInterface from .consumers import BaseConsumer def get_consumer(name: str) -> Optional[Type[BaseConsumer]]: mod_name, cls_name = name.rsplit(".", 1) mod = importlib.import_module(f".consumers.{mod_name}", package=__package__) # print(mod) # print(dir(mod)) res = getattr(mod, cls_name) assert issubclass(res, BaseConsumer) return res def get_config() -> Dict[str, Any]: with open("config.yaml", "r") as fh: conf: dict = yaml.safe_load(fh) conf.setdefault("consumers", {}) return conf def write_config(conf: Dict[str, Any]): with open(".config.yaml~writing", "w") as fh: yaml.safe_dump(conf, fh, indent=2, encoding="utf-8") os.rename(".config.yaml~writing", "config.yaml") def get_consumers(conf: Optional[Dict[str, Any]] = None) -> List[BaseConsumer]: if conf is None: conf = get_config() consumers = [] for name, consumer_config in conf["consumers"].items(): # print(name, consumer_config) mod = get_consumer(name) if mod: # print(mod) consumers.append(mod(consumer_config)) write_config(conf) return consumers def _get_interface(name: str) -> Type[BaseInterface]: mod_name, cls_name = name.rsplit(".", 1) mod = importlib.import_module(f".interfaces.{mod_name}", package=__package__) res = getattr(mod, cls_name) assert issubclass(res, BaseInterface) return res def get_interface(conf: Optional[Dict[str, Any]] = None) -> BaseInterface: if conf is None: conf = get_config() name = conf["interface"]["name"] params = conf["interface"].get("params", {}) mod = _get_interface(name) assert mod return mod(**params) if __name__ == "__main__": conf = get_config() consumers = get_consumers(conf) try: while True: for consumer in consumers: consumer.poll() sleep(1) except (KeyboardInterrupt, SystemExit, Exception) as e: for consumer in consumers: consumer.exit() if type(e) is not KeyboardInterrupt: raise write_config(conf)