srne-mqtt/misc/render_rrd.py

216 lines
5.9 KiB
Python

# -*- coding: utf-8 -*-
import datetime
import os
from ast import literal_eval
from collections import namedtuple
from typing import Any, Dict
import rrdtool # type: ignore
from srnemqtt.solar_types import DataName
DT_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
START = (
int(datetime.datetime.strptime("2021-10-31 19:28:17.502365", DT_FORMAT).timestamp())
- 1
)
# 2021-11-12 16:58:32.262030
HISTORICAL_KEYS = {
DataName.BATTERY_VOLTAGE_MIN,
DataName.BATTERY_VOLTAGE_MAX,
DataName.CHARGE_MAX_CURRENT,
DataName.DISCHARGE_MAX_CURRENT,
DataName.CHARGE_MAX_POWER,
DataName.DISCHARGE_MAX_POWER,
DataName.CHARGE_AMP_HOUR,
DataName.DISCHARGE_AMP_HOUR,
DataName.PRODUCTION_ENERGY,
DataName.CONSUMPTION_ENERGY,
DataName.RUN_DAYS,
DataName.DISCHARGE_COUNT,
DataName.FULL_CHARGE_COUNT,
DataName.TOTAL_CHARGE_AMP_HOURS,
DataName.TOTAL_DISCHARGE_AMP_HOURS,
DataName.TOTAL_PRODUCTION_ENERGY,
DataName.TOTAL_CONSUMPTION_ENERGY,
}
# 2021-11-12 16:58:47.521142
INSTANT_KEYS = {
DataName.BATTERY_CHARGE,
DataName.BATTERY_VOLTAGE,
DataName.BATTERY_CURRENT,
DataName.INTERNAL_TEMPERATURE,
DataName.BATTERY_TEMPERATURE,
DataName.LOAD_VOLTAGE,
DataName.LOAD_CURRENT,
DataName.LOAD_POWER,
DataName.PANEL_VOLTAGE,
DataName.PANEL_CURRENT,
DataName.PANEL_POWER,
DataName.LOAD_ENABLED,
}
KNOWN_KEYS = HISTORICAL_KEYS.union(INSTANT_KEYS)
MAP = {
"_internal_temperature?": "internal_temp",
"unknown1": "charge_max_current",
"unknown2": "discharge_max_current",
"internal_temperature": "internal_temp",
"battery_temperature": "battery_temp",
}
def map_keys(d: dict) -> dict:
res = {}
for k, v in d.items():
if k in MAP:
k = MAP[k]
res[k] = v
return res
DS = namedtuple("DS", ("name", "type", "heartbeat", "min", "max"))
def _DS2str(self: DS) -> str:
return f"DS:{self.name}:{self.type}:{self.heartbeat}:{self.min}:{self.max}"
# Mypy expects object, not DS as first argument
DS.__str__ = _DS2str # type: ignore
datapoints = [
DS("internal_temp", "GAUGE", "60s", "-70", "126"),
DS("battery_temp", "GAUGE", "60s", "-70", "126"),
DS("battery_charge", "GAUGE", "60s", "0", "100"),
DS("battery_voltage", "GAUGE", "60s", "0", "40"),
DS("battery_current", "GAUGE", "60s", "-30", "30"),
DS("load_voltage", "GAUGE", "60s", "0", "40"),
DS("load_current", "GAUGE", "60s", "-30", "30"),
DS("load_power", "GAUGE", "60s", "-800", "800"),
DS("load_enabled", "GAUGE", "60s", "0", "1"),
DS("panel_voltage", "GAUGE", "60s", "0", "120"),
DS("panel_current", "GAUGE", "60s", "-30", "30"),
DS("panel_power", "GAUGE", "60s", "-800", "800"),
]
def parse_log(fh):
# address = None
for line in fh.readlines():
if " " not in line:
continue
date, time, text = line.strip().split(" ", 2)
# print(date, time, text)
try:
dt = datetime.datetime.strptime(" ".join([date, time]), DT_FORMAT)
except ValueError as e:
print(e)
if text.startswith("{") and text.endswith("}"):
try:
data = map_keys(literal_eval(text))
except SyntaxError as e:
print(e)
for key in data.keys():
if key not in KNOWN_KEYS:
if type(key) is int:
continue
yield (dt, data)
# elif text.startswith("Reading"):
# _, addr_txt = text.split(" ")
# address = int(addr_txt.strip("."), 16)
RRDFILE = "test.rrd"
# feed updates to the database
# rrdtool.update("test.rrd", "N:32")
def rrdupdate(file: str, timestamp: int, data: dict):
res = [timestamp]
for ds in datapoints:
res.append(data.get(ds.name, "U"))
update = ":".join([str(int(x)) if type(x) is bool else str(x) for x in res])
# print(update)
rrdtool.update(file, update)
def re_read():
rrdtool.create(
RRDFILE,
# "--no-overwrite",
"--start",
str(START),
"--step=60s",
# Full resolution (1 minute) for 7 days
"RRA:MIN:0.20:1:7d",
"RRA:MAX:0.20:1:7d",
"RRA:LAST:0.20:1:7d",
"RRA:AVERAGE:0.20:1:7d",
*[str(ds) for ds in datapoints],
)
with open("z_solar.log", "r") as fh:
dt_ep_last = 0
data: Dict[str, Any] = {}
for dt, d in parse_log(fh):
# print(dt, d)
if "panel_voltage" in d or "battery_voltage" in d:
dt_ep = int(dt.timestamp())
if not dt_ep_last:
dt_ep_last = dt_ep
if dt_ep_last != dt_ep:
if not data:
continue
rrdupdate(RRDFILE, dt_ep_last, data)
data.clear()
dt_ep_last = dt_ep
# print(d)
# exit()
data.update(d)
if data:
rrdupdate(RRDFILE, dt_ep, data)
# re_read()
# DS("internal_temp", "GAUGE", "60s", "-70", "126")
# DS("battery_temp", "GAUGE", "60s", "-70", "126")
# DS("battery_charge", "GAUGE", "60s", "0", "100")
# DS("battery_voltage", "GAUGE", "60s", "0", "40")
# DS("battery_current", "GAUGE", "60s", "-30", "30")
# DS("load_voltage", "GAUGE", "60s", "0", "40")
# DS("load_current", "GAUGE", "60s", "-30", "30")
# DS("load_power", "GAUGE", "60s", "-800", "800")
# DS("load_enabled", "GAUGE", "60s", "0", "1")
# DS("panel_voltage", "GAUGE", "60s", "0", "120")
# DS("panel_current", "GAUGE", "60s", "-30", "30")
# DS("panel_power", "GAUGE", "60s", "-800", "800")
os.makedirs("graphs", exist_ok=True)
for ds in datapoints:
rrdtool.graph(
f"graphs/{ds.name}.png",
"--start=-1w",
f"--title={ds.name}",
f"DEF:{ds.name}={RRDFILE}:{ds.name}:AVERAGE",
f"LINE:{ds.name}#000000:{ds.name}",
# "LINE:panel_voltage#ff0000:Panel voltage",
# "LINE:panel_power#00ff00:Panel power",
)