# -*- coding: utf-8 -*- import importlib import os from time import sleep from typing import Any, Dict, List, Optional, Type import yaml from .consumers import BaseConsumer from .interfaces import BaseInterface from .srne import Srne def get_consumer(name: str) -> Optional[Type[BaseConsumer]]: mod_name, cls_name = name.rsplit(".", 1) mod = importlib.import_module(f".consumers.{mod_name}", package=__package__) # print(mod) # print(dir(mod)) res = getattr(mod, cls_name) assert issubclass(res, BaseConsumer) return res def get_config() -> Dict[str, Any]: with open("config.yaml", "r") as fh: conf: dict = yaml.safe_load(fh) conf.setdefault("consumers", {}) return conf def write_config(conf: Dict[str, Any]): with open(".config.yaml~writing", "w") as fh: yaml.safe_dump(conf, fh, indent=2, encoding="utf-8") os.rename(".config.yaml~writing", "config.yaml") def get_consumers( srne: Srne, conf: Optional[Dict[str, Any]] = None ) -> List[BaseConsumer]: if conf is None: conf = get_config() consumers = [] for name, consumer_config in conf["consumers"].items(): # print(name, consumer_config) mod = get_consumer(name) if mod: # print(mod) consumers.append(mod(settings=consumer_config, srne=srne)) write_config(conf) return consumers def _get_interface(name: str) -> Type[BaseInterface]: mod_name, cls_name = name.rsplit(".", 1) mod = importlib.import_module(f".interfaces.{mod_name}", package=__package__) res = getattr(mod, cls_name) assert issubclass(res, BaseInterface) return res def get_interface(conf: Optional[Dict[str, Any]] = None) -> BaseInterface: if conf is None: conf = get_config() name = conf["interface"]["name"] params = conf["interface"].get("params", {}) mod = _get_interface(name) assert mod return mod(**params) if __name__ == "__main__": conf = get_config() consumers = get_consumers(Srne(BaseInterface()), conf) try: while True: for consumer in consumers: consumer.poll() sleep(1) except (KeyboardInterrupt, SystemExit, Exception) as e: for consumer in consumers: consumer.exit() if type(e) is not KeyboardInterrupt: raise write_config(conf)