srne-mqtt/srnemqtt/util.py

90 lines
2.1 KiB
Python

# -*- coding: utf-8 -*-
import json
import time
from logging import Filter as LoggingFilter
from logging import getLogger
from typing import Dict, Optional
__all__ = ["humanize_number", "Periodical", "LazyJSON", "LoggingDictFilter"]
# Only factor of 1000
SI_PREFIXES_LARGE = "kMGTPEZY"
SI_PREFIXES_SMALL = "mµnpfazy"
logger = getLogger(__name__)
def humanize_number(data, unit: str = ""):
counter = 0
while data >= 1000:
data /= 1000
counter += 1
if counter >= len(SI_PREFIXES_LARGE):
break
while data < 1:
data *= 1000
counter -= 1
if abs(counter) >= len(SI_PREFIXES_SMALL):
break
if not counter:
prefix = ""
elif counter > 0:
prefix = SI_PREFIXES_LARGE[counter - 1]
elif counter < 0:
prefix = SI_PREFIXES_SMALL[abs(counter) - 1]
return f"{data:.3g} {prefix}{unit}"
class LazyJSON:
def __init__(self, data):
self.data = data
def __str__(self) -> str:
return json.dumps(self.data)
def __repr__(self) -> str:
return repr(self.data)
class LoggingDictFilter(LoggingFilter):
data: Dict[str, str]
def __init__(self):
self.data = {}
def filter(self, record):
print(self.data)
for key, value in self.data.items():
print(key, value)
assert not hasattr(record, key)
setattr(record, key, value)
return True
class Periodical:
prev: float
interval: float
def __init__(self, interval: float, start: Optional[float] = None):
self.prev = time.time() - interval if start is None else start
self.interval = interval
def __call__(self, now: Optional[float] = None) -> bool:
if now is None:
now = time.time()
if (now - self.prev) >= self.interval:
skipped, overshoot = divmod(now - self.prev, self.interval)
skipped -= 1
if skipped:
logger.debug(
"Skipped:", skipped, overshoot, now - self.prev, self.interval
)
self.prev = now - overshoot
return True
return False