srne-mqtt/srnemqtt/consumers/__init__.py

48 lines
1.0 KiB
Python

# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
from typing import Any, Dict
from ..srne import Srne
class BaseConsumer(ABC):
settings: Dict[str, Any]
srne: Srne
@abstractmethod
def __init__(self, settings: Dict[str, Any], srne: Srne) -> None:
self.srne = srne
self.config(settings)
@abstractmethod
def write(self, data: Dict[str, Any]):
"""
Process and send data to wherever it is going.
Avoid blocking.
"""
pass
@abstractmethod
def poll(self):
"""
This function will be ran whenever there is down time.
If your consumer needs to do something periodically, do so here.
This function should not block.
"""
pass
def exit(self):
"""
Called on exit, clean up your handles here
"""
pass
def config(self, settings: Dict[str, Any]):
self.settings = settings
def __enter__(self):
return self
def __exit__(self, etype, value, traceback):
self.exit()