# -*- coding: utf-8 -*- import datetime import os from ast import literal_eval from collections import namedtuple from typing import Any, Dict import rrdtool # type: ignore from srnemqtt.solar_types import DataName DT_FORMAT = "%Y-%m-%d %H:%M:%S.%f" START = ( int(datetime.datetime.strptime("2021-10-31 19:28:17.502365", DT_FORMAT).timestamp()) - 1 ) # 2021-11-12 16:58:32.262030 HISTORICAL_KEYS = { DataName.BATTERY_VOLTAGE_MIN, DataName.BATTERY_VOLTAGE_MAX, DataName.CHARGE_MAX_CURRENT, DataName._DISCHARGE_MAX_CURRENT, DataName.CHARGE_MAX_POWER, DataName.DISCHARGE_MAX_POWER, DataName.CHARGE_AMP_HOUR, DataName.DISCHARGE_AMP_HOUR, DataName.PRODUCTION_ENERGY, DataName.CONSUMPTION_ENERGY, DataName.RUN_DAYS, DataName.DISCHARGE_COUNT, DataName.FULL_CHARGE_COUNT, DataName.TOTAL_CHARGE_AMP_HOURS, DataName.TOTAL_DISCHARGE_AMP_HOURS, DataName.TOTAL_PRODUCTION_ENERGY, DataName.TOTAL_CONSUMPTION_ENERGY, } # 2021-11-12 16:58:47.521142 INSTANT_KEYS = { DataName.BATTERY_CHARGE, DataName.BATTERY_VOLTAGE, DataName.BATTERY_CURRENT, DataName.INTERNAL_TEMPERATURE, DataName.BATTERY_TEMPERATURE, DataName.LOAD_VOLTAGE, DataName.LOAD_CURRENT, DataName.LOAD_POWER, DataName.PANEL_VOLTAGE, DataName.PANEL_CURRENT, DataName.PANEL_POWER, DataName.LOAD_ENABLED, } KNOWN_KEYS = HISTORICAL_KEYS.union(INSTANT_KEYS) MAP = { "_internal_temperature?": "internal_temp", "unknown1": "charge_max_current", "unknown2": "_discharge_max_current?", "internal_temperature": "internal_temp", "battery_temperature": "battery_temp", } def map_keys(d: dict) -> dict: res = {} for k, v in d.items(): if k in MAP: k = MAP[k] res[k] = v return res DS = namedtuple("DS", ("name", "type", "heartbeat", "min", "max")) def _DS2str(self: DS) -> str: return f"DS:{self.name}:{self.type}:{self.heartbeat}:{self.min}:{self.max}" # Mypy expects object, not DS as first argument DS.__str__ = _DS2str # type: ignore datapoints = [ DS("internal_temp", "GAUGE", "60s", "-70", "126"), DS("battery_temp", "GAUGE", "60s", "-70", "126"), DS("battery_charge", "GAUGE", "60s", "0", "100"), DS("battery_voltage", "GAUGE", "60s", "0", "40"), DS("battery_current", "GAUGE", "60s", "-30", "30"), DS("load_voltage", "GAUGE", "60s", "0", "40"), DS("load_current", "GAUGE", "60s", "-30", "30"), DS("load_power", "GAUGE", "60s", "-800", "800"), DS("load_enabled", "GAUGE", "60s", "0", "1"), DS("panel_voltage", "GAUGE", "60s", "0", "120"), DS("panel_current", "GAUGE", "60s", "-30", "30"), DS("panel_power", "GAUGE", "60s", "-800", "800"), ] def parse_log(fh): # address = None for line in fh.readlines(): if " " not in line: continue date, time, text = line.strip().split(" ", 2) # print(date, time, text) try: dt = datetime.datetime.strptime(" ".join([date, time]), DT_FORMAT) except ValueError as e: print(e) if text.startswith("{") and text.endswith("}"): try: data = map_keys(literal_eval(text)) except SyntaxError as e: print(e) for key in data.keys(): if key not in KNOWN_KEYS: if type(key) is int: continue yield (dt, data) # elif text.startswith("Reading"): # _, addr_txt = text.split(" ") # address = int(addr_txt.strip("."), 16) RRDFILE = "test.rrd" # feed updates to the database # rrdtool.update("test.rrd", "N:32") def rrdupdate(file: str, timestamp: int, data: dict): res = [timestamp] for ds in datapoints: res.append(data.get(ds.name, "U")) update = ":".join([str(int(x)) if type(x) is bool else str(x) for x in res]) # print(update) rrdtool.update(file, update) def re_read(): rrdtool.create( RRDFILE, # "--no-overwrite", "--start", str(START), "--step=60s", # Full resolution (1 minute) for 7 days "RRA:MIN:0.20:1:7d", "RRA:MAX:0.20:1:7d", "RRA:LAST:0.20:1:7d", "RRA:AVERAGE:0.20:1:7d", *[str(ds) for ds in datapoints], ) with open("z_solar.log", "r") as fh: dt_ep_last = 0 data: Dict[str, Any] = {} for dt, d in parse_log(fh): # print(dt, d) if "panel_voltage" in d or "battery_voltage" in d: dt_ep = int(dt.timestamp()) if not dt_ep_last: dt_ep_last = dt_ep if dt_ep_last != dt_ep: if not data: continue rrdupdate(RRDFILE, dt_ep_last, data) data.clear() dt_ep_last = dt_ep # print(d) # exit() data.update(d) if data: rrdupdate(RRDFILE, dt_ep, data) # re_read() # DS("internal_temp", "GAUGE", "60s", "-70", "126") # DS("battery_temp", "GAUGE", "60s", "-70", "126") # DS("battery_charge", "GAUGE", "60s", "0", "100") # DS("battery_voltage", "GAUGE", "60s", "0", "40") # DS("battery_current", "GAUGE", "60s", "-30", "30") # DS("load_voltage", "GAUGE", "60s", "0", "40") # DS("load_current", "GAUGE", "60s", "-30", "30") # DS("load_power", "GAUGE", "60s", "-800", "800") # DS("load_enabled", "GAUGE", "60s", "0", "1") # DS("panel_voltage", "GAUGE", "60s", "0", "120") # DS("panel_current", "GAUGE", "60s", "-30", "30") # DS("panel_power", "GAUGE", "60s", "-800", "800") os.makedirs("graphs", exist_ok=True) for ds in datapoints: rrdtool.graph( f"graphs/{ds.name}.png", "--start=-1w", f"--title={ds.name}", f"DEF:{ds.name}={RRDFILE}:{ds.name}:AVERAGE", f"LINE:{ds.name}#000000:{ds.name}", # "LINE:panel_voltage#ff0000:Panel voltage", # "LINE:panel_power#00ff00:Panel power", )