srne-mqtt/srnemqtt/consumers/__init__.py

43 lines
995 B
Python

# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
from typing import Any, Dict
class BaseConsumer(ABC):
settings: Dict[str, Any]
@abstractmethod
def __init__(self, settings: Dict[str, Any]) -> None:
self.config(settings)
@abstractmethod
def write(self, data: Dict[str, Any]):
"""
Process and send data to wherever it is going.
Avoid blocking.
"""
pass
@abstractmethod
def poll(self):
"""
This function will be ran whenever there is down time.
If your consumer needs to do something periodically, do so here.
This function should not block.
"""
pass
def exit(self):
"""
Called on exit, clean up your handles here
"""
pass
def config(self, settings: Dict[str, Any]):
self.settings = settings
def __enter__(self):
return self
def __exit__(self, etype, value, traceback):
self.exit()