# -*- coding: utf-8 -*- from abc import ABC, abstractmethod from typing import Any, Dict from ..srne import Srne class BaseConsumer(ABC): settings: Dict[str, Any] srne: Srne @abstractmethod def __init__(self, settings: Dict[str, Any], srne: Srne) -> None: self.srne = srne self.config(settings) @abstractmethod def write(self, data: Dict[str, Any]): """ Process and send data to wherever it is going. Avoid blocking. """ pass @abstractmethod def poll(self): """ This function will be ran whenever there is down time. If your consumer needs to do something periodically, do so here. This function should not block. """ pass def exit(self): """ Called on exit, clean up your handles here """ pass def config(self, settings: Dict[str, Any]): self.settings = settings def __enter__(self): return self def __exit__(self, etype, value, traceback): self.exit()