srne-mqtt/srnemqtt/util.py

60 lines
1.5 KiB
Python

# -*- coding: utf-8 -*-
import time
from logging import getLogger
from typing import Optional
# Only factor of 1000
SI_PREFIXES_LARGE = "kMGTPEZY"
SI_PREFIXES_SMALL = "mµnpfazy"
logger = getLogger(__name__)
def humanize_number(data, unit: str = ""):
counter = 0
while data >= 1000:
data /= 1000
counter += 1
if counter >= len(SI_PREFIXES_LARGE):
break
while data < 1:
data *= 1000
counter -= 1
if abs(counter) >= len(SI_PREFIXES_SMALL):
break
if not counter:
prefix = ""
elif counter > 0:
prefix = SI_PREFIXES_LARGE[counter - 1]
elif counter < 0:
prefix = SI_PREFIXES_SMALL[abs(counter) - 1]
return f"{data:.3g} {prefix}{unit}"
class Periodical:
prev: float
interval: float
def __init__(self, interval: float, start: Optional[float] = None):
self.prev = time.time() - interval if start is None else start
self.interval = interval
def __call__(self, now: Optional[float] = None) -> bool:
if now is None:
now = time.time()
if (now - self.prev) >= self.interval:
skipped, overshoot = divmod(now - self.prev, self.interval)
skipped -= 1
if skipped:
logger.debug(
"Skipped:", skipped, overshoot, now - self.prev, self.interval
)
self.prev = now - overshoot
return True
return False