# -*- coding: utf-8 -*- import importlib import os from time import sleep from typing import Any, Dict, List, Optional, Type import yaml from .consumers import BaseConsumer from .interfaces import BaseInterface def get_consumer(name: str) -> Optional[Type[BaseConsumer]]: mod_name, cls_name = name.rsplit(".", 1) mod = importlib.import_module(f".consumers.{mod_name}", package=__package__) # print(mod) # print(dir(mod)) res = getattr(mod, cls_name) assert issubclass(res, BaseConsumer) return res def get_config() -> Dict[str, Any]: with open("config.yaml", "r") as fh: conf: dict = yaml.safe_load(fh) conf.setdefault("consumers", {}) logging = conf.setdefault("logging", {}) logging.setdefault("version", 1) logging.setdefault("disable_existing_loggers", False) logging.setdefault( "handlers", { "console": { "class": "logging.StreamHandler", "formatter": "default", "level": "INFO", "stream": "ext://sys.stdout", } }, ) logging.setdefault( "formatters", { "format": "%(asctime)s %(levelname)-8s %(name)-15s %(message)s", "datefmt": "%Y-%m-%d %H:%M:%S", }, ) loggers = logging.setdefault("loggers", {}) loggers.setdefault("root", {"handlers": ["console"], "level": "DEBUG"}) return conf def write_config(conf: Dict[str, Any]): with open(".config.yaml~writing", "w") as fh: yaml.safe_dump(conf, fh, indent=2, encoding="utf-8") fh.flush() os.fsync(fh.fileno()) os.replace(".config.yaml~writing", "config.yaml") def get_consumers(conf: Optional[Dict[str, Any]] = None) -> List[BaseConsumer]: if conf is None: conf = get_config() consumers = [] for name, consumer_config in conf["consumers"].items(): # print(name, consumer_config) mod = get_consumer(name) if mod: # print(mod) consumers.append(mod(consumer_config)) write_config(conf) return consumers def _get_interface(name: str) -> Type[BaseInterface]: mod_name, cls_name = name.rsplit(".", 1) mod = importlib.import_module(f".interfaces.{mod_name}", package=__package__) res = getattr(mod, cls_name) assert issubclass(res, BaseInterface) return res def get_interface(conf: Optional[Dict[str, Any]] = None) -> BaseInterface: if conf is None: conf = get_config() name = conf["interface"]["name"] params = conf["interface"].get("params", {}) mod = _get_interface(name) assert mod return mod(**params) if __name__ == "__main__": conf = get_config() consumers = get_consumers(conf) try: while True: for consumer in consumers: consumer.poll() sleep(1) except (KeyboardInterrupt, SystemExit, Exception) as e: for consumer in consumers: consumer.exit() if type(e) is not KeyboardInterrupt: raise write_config(conf)