markqvist___Sideband/docs/example_plugins/raspi_telemetry.py
2026-04-27 16:19:55 +02:00

152 lines
6.3 KiB
Python

# This is a basic Raspberry Pi telemetry plugin
# example that you can build upon to
# implement your own telemetry plugins.
import os
import RNS
import time
import psutil
import shutil
from threading import Thread
import urllib.request, json
class RasPiTelemetryPlugin(SidebandTelemetryPlugin):
plugin_name = "raspi_telemetry"
def start(self):
# Do any initialisation work here
RNS.log("Raspberry Pi Telemetry plugin starting...")
self.initialise_values()
self.power_stats = False
self.storage_stats = True
self.target_disk = {"blkid": "mmcblk0p2", "label": "SD Card"}
self.should_run = True
self.update_thread = Thread(target=self.update_job, daemon=True)
self.update_thread.start()
# And finally call start on superclass
super().start()
def stop(self):
# Do any teardown work here
self.should_run = False
# And finally call stop on superclass
super().stop()
def initialise_values(self):
self.battery_percent = None
self.power_production = None
self.power_consumption = None
self.battery_charging = None
self.battery_temperature = None
self.uptime = None
def update_job(self):
while self.should_run:
try:
# Update uptime
self.uptime = RNS.prettytime(time.time()-psutil.boot_time())
# Update power values if enabled
if self.power_stats:
with urllib.request.urlopen("http://some_host/status.json") as url:
data = json.loads(url.read().decode())
self.power_production = data["solar_yield"]
self.power_consumption = data["inverter_load"]+data["dc_consumption"]
self.battery_charging = data["battery_current"] >= 0.0
self.battery_charge = data["battery_charge"]
self.battery_temperature = data["battery_temperature"]
except Exception as e:
RNS.log("Error while updating plugin telemetry: "+str(e), RNS.LOG_ERROR)
time.sleep(15)
def update_telemetry(self, telemeter):
if telemeter != None:
if self.power_stats:
# Create power consumption sensor
telemeter.synthesize("power_consumption")
telemeter.sensors["power_consumption"].update_consumer(self.power_consumption, type_label="Power consumption")
# Create power production sensor
telemeter.synthesize("power_production")
telemeter.sensors["power_production"].update_producer(self.power_production, type_label="Solar production", custom_icon="solar-power-variant")
# Create battery sensor
telemeter.synthesize("battery")
telemeter.sensors["battery"].data = {"charge_percent": round(self.battery_charge, 1), "charging": self.battery_charging}
# Create NVM sensor if enabled
if self.storage_stats:
mount_point = None
for partition in psutil.disk_partitions(all=False):
if self.target_disk["blkid"] in partition.device:
mount_point = partition.mountpoint
break
if mount_point:
st = shutil.disk_usage(mount_point)
telemeter.synthesize("nvm")
telemeter.sensors["nvm"].update_entry(capacity=st.total, used=st.used, type_label=self.target_disk["label"])
# Create RAM sensors
ms = psutil.virtual_memory()
telemeter.synthesize("ram")
telemeter.sensors["ram"].update_entry(capacity=ms.total, used=ms.used, type_label="RAM")
# Create CPU sensor
a = psutil.getloadavg()
cps = 0; cpms = 5
for m in range(cpms):
cps += psutil.cpu_percent()/100.0
time.sleep(0.05)
cp = cps/cpms
telemeter.synthesize("processor")
telemeter.sensors["processor"].update_entry(current_load=cp, clock=round(psutil.cpu_freq().current*1e6, 0), load_avgs=[a[0], a[1], a[2]], type_label="CPU")
# Create custom sensor for uptime
telemeter.synthesize("custom")
telemeter.sensors["custom"].update_entry(self.uptime, type_label="Uptime is", custom_icon="timer-refresh-outline")
# Read temperature using built-in sensor
try:
cpu_temp_path = '/sys/class/thermal/thermal_zone0/temp'
with open(cpu_temp_path) as f: temp = int(f.read().strip()) / 1000.0
telemeter.synthesize("temperature")
telemeter.sensors["temperature"].data = {"c": round(temp, 2)}
except Exception as e: RNS.log(f"Getting temperature reading failed: {e}", RNS.LOG_ERROR)
# Network Interfaces
try:
for iface in os.listdir('/sys/class/net'):
carrier = None
speed = None
try:
with open(f'/sys/class/net/{iface}/carrier') as f: carrier = int(f.read().strip()) == 1
except FileNotFoundError: pass
if iface.startswith('eth') or iface.startswith('wlan'):
try:
with open(f'/sys/class/net/{iface}/speed') as f: speed = int(f.read().strip())
except (FileNotFoundError, ValueError): pass
telemeter.synthesize("custom")
speed_str = f" at {RNS.prettyspeed(speed*1e6)}" if speed else ""
sensors_str = f"carrier{speed_str}" if carrier else "has no carrier"
telemeter.sensors["custom"].update_entry(value=f"{sensors_str}", type_label=f"{iface}", custom_icon="lan" if carrier else "lan-disconnect")
except Exception as e: RNS.log(f"Getting network stats failed: {e}", RNS.LOG_DEBUG)
# Finally, tell Sideband what class in this
# file is the actual plugin class.
plugin_class = RasPiTelemetryPlugin