mirror of
https://github.com/markqvist/Reticulum.git
synced 2026-04-27 14:20:35 +00:00
687 lines
34 KiB
Python
687 lines
34 KiB
Python
#!/usr/bin/env python3
|
||
|
||
# Reticulum License
|
||
#
|
||
# Copyright (c) 2016-2025 Mark Qvist
|
||
#
|
||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||
# of this software and associated documentation files (the "Software"), to deal
|
||
# in the Software without restriction, including without limitation the rights
|
||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||
# copies of the Software, and to permit persons to whom the Software is
|
||
# furnished to do so, subject to the following conditions:
|
||
#
|
||
# - The Software shall not be used in any kind of system which includes amongst
|
||
# its functions the ability to purposefully do harm to human beings.
|
||
#
|
||
# - The Software shall not be used, directly or indirectly, in the creation of
|
||
# an artificial intelligence, machine learning or language model training
|
||
# dataset, including but not limited to any use that contributes to the
|
||
# training or development of such a model or algorithm.
|
||
#
|
||
# - The above copyright notice and this permission notice shall be included in
|
||
# all copies or substantial portions of the Software.
|
||
#
|
||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||
# SOFTWARE.
|
||
|
||
import RNS
|
||
import os
|
||
import sys
|
||
import time
|
||
import argparse
|
||
import io
|
||
|
||
from RNS._version import __version__
|
||
|
||
def size_str(num, suffix='B'):
|
||
units = ['','K','M','G','T','P','E','Z']
|
||
last_unit = 'Y'
|
||
|
||
if suffix == 'b':
|
||
num *= 8
|
||
units = ['','K','M','G','T','P','E','Z']
|
||
last_unit = 'Y'
|
||
|
||
for unit in units:
|
||
if abs(num) < 1000.0:
|
||
if unit == "":
|
||
return "%.0f %s%s" % (num, unit, suffix)
|
||
else:
|
||
return "%.2f %s%s" % (num, unit, suffix)
|
||
num /= 1000.0
|
||
|
||
return "%.2f%s%s" % (num, last_unit, suffix)
|
||
|
||
request_result = None
|
||
request_concluded = False
|
||
def get_remote_status(destination_hash, include_lstats, identity, no_output=False, timeout=RNS.Transport.PATH_REQUEST_TIMEOUT):
|
||
global request_result, request_concluded
|
||
link_count = None
|
||
|
||
if not RNS.Transport.has_path(destination_hash):
|
||
if not no_output:
|
||
print("Path to "+RNS.prettyhexrep(destination_hash)+" requested", end=" ")
|
||
sys.stdout.flush()
|
||
RNS.Transport.request_path(destination_hash)
|
||
pr_time = time.time()
|
||
while not RNS.Transport.has_path(destination_hash):
|
||
time.sleep(0.1)
|
||
if time.time() - pr_time > timeout:
|
||
if not no_output:
|
||
print("\r \r", end="")
|
||
print("Path request timed out")
|
||
exit(12)
|
||
|
||
remote_identity = RNS.Identity.recall(destination_hash)
|
||
|
||
def remote_link_closed(link):
|
||
if link.teardown_reason == RNS.Link.TIMEOUT:
|
||
if not no_output:
|
||
print("\r \r", end="")
|
||
print("The link timed out, exiting now")
|
||
elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED:
|
||
if not no_output:
|
||
print("\r \r", end="")
|
||
print("The link was closed by the server, exiting now")
|
||
else:
|
||
if not no_output:
|
||
print("\r \r", end="")
|
||
print("Link closed unexpectedly, exiting now")
|
||
exit(10)
|
||
|
||
def request_failed(request_receipt):
|
||
global request_result, request_concluded
|
||
if not no_output:
|
||
print("\r \r", end="")
|
||
print("The remote status request failed. Likely authentication failure.")
|
||
request_concluded = True
|
||
|
||
def got_response(request_receipt):
|
||
global request_result, request_concluded
|
||
response = request_receipt.response
|
||
if isinstance(response, list):
|
||
status = response[0]
|
||
if len(response) > 1:
|
||
link_count = response[1]
|
||
else:
|
||
link_count = None
|
||
|
||
request_result = (status, link_count)
|
||
|
||
request_concluded = True
|
||
|
||
def remote_link_established(link):
|
||
if not no_output:
|
||
print("\r \r", end="")
|
||
print("Sending request...", end=" ")
|
||
sys.stdout.flush()
|
||
link.identify(identity)
|
||
link.request("/status", data = [include_lstats], response_callback = got_response, failed_callback = request_failed)
|
||
|
||
if not no_output:
|
||
print("\r \r", end="")
|
||
print("Establishing link with remote transport instance...", end=" ")
|
||
sys.stdout.flush()
|
||
|
||
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "rnstransport", "remote", "management")
|
||
link = RNS.Link(remote_destination)
|
||
link.set_link_established_callback(remote_link_established)
|
||
link.set_link_closed_callback(remote_link_closed)
|
||
|
||
while not request_concluded:
|
||
time.sleep(0.1)
|
||
|
||
if request_result != None:
|
||
print("\r \r", end="")
|
||
|
||
return request_result
|
||
|
||
def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=False, astats=False, lstats=False, sorting=None, sort_reverse=False,
|
||
remote=None, management_identity=None, remote_timeout=RNS.Transport.PATH_REQUEST_TIMEOUT, must_exit=True, rns_instance=None,
|
||
traffic_totals=False, discovered_interfaces=False, config_entries=False):
|
||
|
||
if remote: require_shared = False
|
||
else: require_shared = True
|
||
|
||
try:
|
||
if rns_instance:
|
||
reticulum = rns_instance
|
||
must_exit = False
|
||
else:
|
||
reticulum = RNS.Reticulum(configdir=configdir, loglevel=3+verbosity, require_shared_instance=require_shared)
|
||
|
||
except Exception as e:
|
||
print("No shared RNS instance available to get status from")
|
||
if must_exit: exit(1)
|
||
else: return
|
||
|
||
link_count = None
|
||
stats = None
|
||
|
||
details = False
|
||
if config_entries:
|
||
discovered_interfaces = True
|
||
details = True
|
||
|
||
if discovered_interfaces:
|
||
if_discovery = RNS.Discovery.InterfaceDiscovery(discover_interfaces=False)
|
||
ifs = if_discovery.list_discovered_interfaces()
|
||
print("")
|
||
|
||
if json:
|
||
import json
|
||
for i in ifs:
|
||
for e in i:
|
||
if isinstance(i[e], bytes): i[e] = RNS.hexrep(i[e], delimit=False)
|
||
|
||
print(json.dumps(ifs))
|
||
|
||
else:
|
||
filtered_ifs = []
|
||
for i in ifs:
|
||
name = i["name"]
|
||
if not name_filter or name_filter.lower() in name.lower(): filtered_ifs.append(i)
|
||
|
||
if details:
|
||
for idx, i in enumerate(filtered_ifs):
|
||
try:
|
||
name = i["name"]
|
||
if_type = i["type"]
|
||
status = i["status"]
|
||
|
||
if status == "available": status_display = "Available"
|
||
elif status == "unknown": status_display = "Unknown"
|
||
elif status == "stale": status_display = "Stale"
|
||
else: status_display = status
|
||
|
||
now = time.time()
|
||
dago = now-i["discovered"]
|
||
hago = now-i["last_heard"]
|
||
discovered_display = f"{RNS.prettytime(dago, compact=True)} ago"
|
||
last_heard_display = f"{RNS.prettytime(hago, compact=True)} ago"
|
||
transport_str = "Enabled" if i["transport"] else "Disabled"
|
||
|
||
if i["latitude"] is not None and i["longitude"] is not None:
|
||
lat = round(i["latitude"], 4)
|
||
lon = round(i["longitude"], 4)
|
||
if i["height"] != None: height = ", "+str(i["height"])+"m h"
|
||
else: height = ""
|
||
location = f"{lat}, {lon}{height}"
|
||
else: location = "Unknown"
|
||
|
||
transport_id = None
|
||
network = None
|
||
if "transport_id" in i: transport_id = i["transport_id"]
|
||
if "transport_id" in i and "network_id" in i and i["transport_id"] != i["network_id"]:
|
||
network = i["network_id"]
|
||
|
||
if idx > 0: print("\n"+"="*32+"\n")
|
||
if network: print(f"Network ID : {network}")
|
||
if transport_id: print(f"Transport ID : {transport_id}")
|
||
|
||
print(f"Name : {name}")
|
||
print(f"Type : {if_type}")
|
||
print(f"Status : {status_display}")
|
||
print(f"Transport : {transport_str}")
|
||
print(f"Distance : {i['hops']} hop{'' if i['hops'] == 1 else 's'}")
|
||
print(f"Discovered : {discovered_display}")
|
||
print(f"Last Heard : {last_heard_display}")
|
||
print(f"Location : {location}")
|
||
|
||
if "frequency" in i: print(f"Frequency : {i['frequency']:,} Hz")
|
||
if "bandwidth" in i: print(f"Bandwidth : {i['bandwidth']:,} Hz")
|
||
if "sf" in i: print(f"Sprd. Factor : {i['sf']}")
|
||
if "cr" in i: print(f"Coding Rate : {i['cr']}")
|
||
if "modulation" in i: print(f"Modulation : {i['modulation']}")
|
||
if "reachable_on" in i: print(f"Address : {i['reachable_on']}")
|
||
if "port" in i: print(f"Port : {i['port']}")
|
||
|
||
print(f"Stamp Value : {i['value']}")
|
||
|
||
print(f"\nConfiguration Entry:")
|
||
config_lines = i["config_entry"].split('\n')
|
||
for line in config_lines: print(f" {line}")
|
||
|
||
except Exception as e:
|
||
pass
|
||
|
||
else:
|
||
print(f"{'Name':<25} {'Type':<12} {'Status':<12} {'Last Heard':<12} {'Value':<8} {'Location':<15}")
|
||
print("-" * 89)
|
||
|
||
for i in filtered_ifs:
|
||
try:
|
||
name = i["name"][:24] + "…" if len(i["name"]) > 24 else i["name"]
|
||
|
||
if_type = i["type"].replace("Interface", "")
|
||
|
||
status = i["status"]
|
||
if status == "available": status_display = "✓ Available"
|
||
elif status == "unknown": status_display = "? Unknown"
|
||
elif status == "stale": status_display = "× Stale"
|
||
else: status_display = status
|
||
|
||
now = time.time()
|
||
last_heard = i["last_heard"]
|
||
diff = now - last_heard
|
||
|
||
if diff < 60: last_heard_display = "Just now"
|
||
elif diff < 3600:
|
||
mins = int(diff / 60)
|
||
last_heard_display = f"{mins}m ago"
|
||
elif diff < 86400:
|
||
hours = int(diff / 3600)
|
||
last_heard_display = f"{hours}h ago"
|
||
else:
|
||
days = int(diff / 86400)
|
||
last_heard_display = f"{days}d ago"
|
||
|
||
value = str(i["value"])
|
||
|
||
if i["latitude"] is not None and i["longitude"] is not None:
|
||
lat = round(i["latitude"], 4)
|
||
lon = round(i["longitude"], 4)
|
||
location = f"{lat}, {lon}"
|
||
else: location = "N/A"
|
||
|
||
print(f"{name:<25} {if_type:<12} {status_display:<12} {last_heard_display:<12} {value:<8} {location:<15}")
|
||
|
||
except Exception as e:
|
||
pass
|
||
|
||
if must_exit: exit(0)
|
||
else: return
|
||
|
||
if remote:
|
||
try:
|
||
if management_identity is None:
|
||
raise ValueError("Remote management requires an identity file. Use -i to specify the path to a management identity.")
|
||
|
||
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
|
||
if len(remote) != dest_len:
|
||
raise ValueError("Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2))
|
||
try:
|
||
identity_hash = bytes.fromhex(remote)
|
||
destination_hash = RNS.Destination.hash_from_name_and_identity("rnstransport.remote.management", identity_hash)
|
||
except Exception as e:
|
||
raise ValueError("Invalid destination entered. Check your input.")
|
||
|
||
identity = RNS.Identity.from_file(os.path.expanduser(management_identity))
|
||
if identity == None:
|
||
raise ValueError("Could not load management identity from "+str(management_identity))
|
||
|
||
try:
|
||
remote_status = get_remote_status(destination_hash, lstats, identity, no_output=json, timeout=remote_timeout)
|
||
if remote_status != None:
|
||
stats, link_count = remote_status
|
||
except Exception as e:
|
||
raise e
|
||
|
||
except Exception as e:
|
||
print(str(e))
|
||
if must_exit: exit(20)
|
||
else: return
|
||
|
||
else:
|
||
if lstats:
|
||
try: link_count = reticulum.get_link_count()
|
||
except Exception as e: pass
|
||
|
||
try: stats = reticulum.get_interface_stats()
|
||
except Exception as e: pass
|
||
|
||
if stats != None:
|
||
if json:
|
||
import json
|
||
for s in stats:
|
||
if isinstance(stats[s], bytes):
|
||
stats[s] = RNS.hexrep(stats[s], delimit=False)
|
||
|
||
if isinstance(stats[s], dict) or isinstance(stats[s], list):
|
||
for i in stats[s]:
|
||
if isinstance(i, dict):
|
||
for k in i:
|
||
if isinstance(i[k], bytes):
|
||
i[k] = RNS.hexrep(i[k], delimit=False)
|
||
|
||
print(json.dumps(stats))
|
||
if must_exit: exit()
|
||
else: return
|
||
|
||
interfaces = stats["interfaces"]
|
||
if sorting != None and isinstance(sorting, str):
|
||
sorting = sorting.lower()
|
||
if sorting == "rate" or sorting == "bitrate":
|
||
interfaces.sort(key=lambda i: i["bitrate"], reverse=not sort_reverse)
|
||
if sorting == "rx":
|
||
interfaces.sort(key=lambda i: i["rxb"], reverse=not sort_reverse)
|
||
if sorting == "tx":
|
||
interfaces.sort(key=lambda i: i["txb"], reverse=not sort_reverse)
|
||
if sorting == "rxs":
|
||
interfaces.sort(key=lambda i: i["rxs"], reverse=not sort_reverse)
|
||
if sorting == "txs":
|
||
interfaces.sort(key=lambda i: i["txs"], reverse=not sort_reverse)
|
||
if sorting == "traffic":
|
||
interfaces.sort(key=lambda i: i["rxb"]+i["txb"], reverse=not sort_reverse)
|
||
if sorting == "announces" or sorting == "announce":
|
||
interfaces.sort(key=lambda i: i["incoming_announce_frequency"]+i["outgoing_announce_frequency"], reverse=not sort_reverse)
|
||
if sorting == "arx":
|
||
interfaces.sort(key=lambda i: i["incoming_announce_frequency"], reverse=not sort_reverse)
|
||
if sorting == "atx":
|
||
interfaces.sort(key=lambda i: i["outgoing_announce_frequency"], reverse=not sort_reverse)
|
||
if sorting == "held":
|
||
interfaces.sort(key=lambda i: i["held_announces"], reverse=not sort_reverse)
|
||
|
||
|
||
for ifstat in interfaces:
|
||
name = ifstat["name"]
|
||
|
||
if dispall or not (
|
||
name.startswith("LocalInterface[") or
|
||
name.startswith("TCPInterface[Client") or
|
||
name.startswith("BackboneInterface[Client on") or
|
||
name.startswith("AutoInterfacePeer[") or
|
||
name.startswith("WeaveInterfacePeer[") or
|
||
name.startswith("I2PInterfacePeer[Connected peer") or
|
||
(name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False))
|
||
):
|
||
|
||
if not (name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False)):
|
||
if name_filter == None or name_filter.lower() in name.lower():
|
||
print("")
|
||
|
||
if ifstat["status"]: ss = "Up"
|
||
else: ss = "Down"
|
||
|
||
if ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: modestr = "Access Point"
|
||
elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_POINT_TO_POINT: modestr = "Point-to-Point"
|
||
elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_ROAMING: modestr = "Roaming"
|
||
elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: modestr = "Boundary"
|
||
elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_GATEWAY: modestr = "Gateway"
|
||
else: modestr = "Full"
|
||
|
||
|
||
if ifstat["clients"] != None:
|
||
clients = ifstat["clients"]
|
||
if name.startswith("Shared Instance["):
|
||
cnum = max(clients-1,0)
|
||
if cnum == 1:
|
||
spec_str = " program"
|
||
else:
|
||
spec_str = " programs"
|
||
|
||
clients_string = "Serving : "+str(cnum)+spec_str
|
||
elif name.startswith("I2PInterface["):
|
||
if "i2p_connectable" in ifstat and ifstat["i2p_connectable"] == True:
|
||
cnum = clients
|
||
if cnum == 1:
|
||
spec_str = " connected I2P endpoint"
|
||
else:
|
||
spec_str = " connected I2P endpoints"
|
||
|
||
clients_string = "Peers : "+str(cnum)+spec_str
|
||
else:
|
||
clients_string = ""
|
||
else:
|
||
clients_string = "Clients : "+str(clients)
|
||
|
||
else:
|
||
clients = None
|
||
|
||
print(" {n}".format(n=ifstat["name"]))
|
||
|
||
if "autoconnect_source" in ifstat and ifstat["autoconnect_source"] != None:
|
||
print(" Source : Auto-connect via <{ns}>".format(ns=ifstat["autoconnect_source"]))
|
||
|
||
if "ifac_netname" in ifstat and ifstat["ifac_netname"] != None:
|
||
print(" Network : {nn}".format(nn=ifstat["ifac_netname"]))
|
||
|
||
print(" Status : {ss}".format(ss=ss))
|
||
|
||
if clients != None and clients_string != "":
|
||
print(" "+clients_string)
|
||
|
||
if not (name.startswith("Shared Instance[") or name.startswith("TCPInterface[Client") or name.startswith("LocalInterface[")):
|
||
print(" Mode : {mode}".format(mode=modestr))
|
||
|
||
if "bitrate" in ifstat and ifstat["bitrate"] != None:
|
||
print(" Rate : {ss}".format(ss=speed_str(ifstat["bitrate"])))
|
||
|
||
if "noise_floor" in ifstat:
|
||
if not "interference" in ifstat: nstr = ""
|
||
else:
|
||
nf = ifstat["interference"]
|
||
lstr = ", no interference"
|
||
if "interference_last_ts" in ifstat and "interference_last_dbm" in ifstat:
|
||
lago = time.time()-ifstat["interference_last_ts"]
|
||
ldbm = ifstat["interference_last_dbm"]
|
||
lstr = f"\n Intrfrnc. : {ldbm} dBm {RNS.prettytime(lago, compact=True)} ago"
|
||
|
||
|
||
nstr = f"\n Intrfrnc. : {nf} dBm" if nf else lstr
|
||
|
||
if ifstat["noise_floor"] != None: print(" Noise Fl. : {nfl} dBm{ntr}".format(nfl=str(ifstat["noise_floor"]), ntr=nstr))
|
||
else: print(" Noise Fl. : Unknown")
|
||
|
||
if "cpu_load" in ifstat:
|
||
if ifstat["cpu_load"] != None: print(" CPU load : {v} %".format(v=str(ifstat["cpu_load"])))
|
||
else: print(" CPU load : Unknown")
|
||
|
||
if "cpu_temp" in ifstat:
|
||
if ifstat["cpu_temp"] != None: print(" CPU temp : {v}°C".format(v=str(ifstat["cpu_temp"])))
|
||
else: print(" CPU load : Unknown")
|
||
|
||
if "mem_load" in ifstat:
|
||
if ifstat["cpu_load"] != None: print(" Mem usage : {v} %".format(v=str(ifstat["mem_load"])))
|
||
else: print(" Mem usage : Unknown")
|
||
|
||
if "battery_percent" in ifstat and ifstat["battery_percent"] != None:
|
||
try:
|
||
bpi = int(ifstat["battery_percent"])
|
||
bss = ifstat["battery_state"]
|
||
print(f" Battery : {bpi}% ({bss})")
|
||
except:
|
||
pass
|
||
|
||
if "airtime_short" in ifstat and "airtime_long" in ifstat:
|
||
print(" Airtime : {ats}% (15s), {atl}% (1h)".format(ats=str(ifstat["airtime_short"]),atl=str(ifstat["airtime_long"])))
|
||
|
||
if "channel_load_short" in ifstat and "channel_load_long" in ifstat:
|
||
print(" Ch. Load : {ats}% (15s), {atl}% (1h)".format(ats=str(ifstat["channel_load_short"]),atl=str(ifstat["channel_load_long"])))
|
||
|
||
if "switch_id" in ifstat:
|
||
if ifstat["switch_id"] != None: print(" Switch ID : {v}".format(v=str(ifstat["switch_id"])))
|
||
else: print(" Switch ID : Unknown")
|
||
|
||
if "endpoint_id" in ifstat:
|
||
if ifstat["endpoint_id"] != None: print(" Endpoint : {v}".format(v=str(ifstat["endpoint_id"])))
|
||
else: print(" Endpoint : Unknown")
|
||
|
||
if "via_switch_id" in ifstat:
|
||
if ifstat["via_switch_id"] != None: print(" Via : {v}".format(v=str(ifstat["via_switch_id"])))
|
||
else: print(" Via : Unknown")
|
||
|
||
if "peers" in ifstat and ifstat["peers"] != None:
|
||
print(" Peers : {np} reachable".format(np=ifstat["peers"]))
|
||
|
||
if "tunnelstate" in ifstat and ifstat["tunnelstate"] != None:
|
||
print(" I2P : {ts}".format(ts=ifstat["tunnelstate"]))
|
||
|
||
if "ifac_signature" in ifstat and ifstat["ifac_signature"] != None:
|
||
sigstr = "<…"+RNS.hexrep(ifstat["ifac_signature"][-5:], delimit=False)+">"
|
||
print(" Access : {nb}-bit IFAC by {sig}".format(nb=ifstat["ifac_size"]*8, sig=sigstr))
|
||
|
||
if "i2p_b32" in ifstat and ifstat["i2p_b32"] != None:
|
||
print(" I2P B32 : {ep}".format(ep=str(ifstat["i2p_b32"])))
|
||
|
||
if astats and "announce_queue" in ifstat and ifstat["announce_queue"] != None and ifstat["announce_queue"] > 0:
|
||
aqn = ifstat["announce_queue"]
|
||
if aqn == 1:
|
||
print(" Queued : {np} announce".format(np=aqn))
|
||
else:
|
||
print(" Queued : {np} announces".format(np=aqn))
|
||
|
||
if astats and "held_announces" in ifstat and ifstat["held_announces"] != None and ifstat["held_announces"] > 0:
|
||
aqn = ifstat["held_announces"]
|
||
if aqn == 1:
|
||
print(" Held : {np} announce".format(np=aqn))
|
||
else:
|
||
print(" Held : {np} announces".format(np=aqn))
|
||
|
||
if astats and "incoming_announce_frequency" in ifstat and ifstat["incoming_announce_frequency"] != None:
|
||
print(" Announces : {iaf}↑".format(iaf=RNS.prettyfrequency(ifstat["outgoing_announce_frequency"])))
|
||
print(" {iaf}↓".format(iaf=RNS.prettyfrequency(ifstat["incoming_announce_frequency"])))
|
||
|
||
rxb_str = "↓"+RNS.prettysize(ifstat["rxb"])
|
||
txb_str = "↑"+RNS.prettysize(ifstat["txb"])
|
||
strdiff = len(rxb_str)-len(txb_str)
|
||
if strdiff > 0:
|
||
txb_str += " "*strdiff
|
||
elif strdiff < 0:
|
||
rxb_str += " "*-strdiff
|
||
|
||
rxstat = rxb_str
|
||
txstat = txb_str
|
||
if "rxs" in ifstat and "txs" in ifstat:
|
||
rxstat += " "+RNS.prettyspeed(ifstat["rxs"])
|
||
txstat += " "+RNS.prettyspeed(ifstat["txs"])
|
||
|
||
print(f" Traffic : {txstat}\n {rxstat}")
|
||
|
||
lstr = ""
|
||
if link_count != None and lstats:
|
||
ms = "y" if link_count == 1 else "ies"
|
||
if "transport_id" in stats and stats["transport_id"] != None:
|
||
lstr = f", {link_count} entr{ms} in link table"
|
||
else:
|
||
lstr = f" {link_count} entr{ms} in link table"
|
||
|
||
if traffic_totals:
|
||
rxb_str = "↓"+RNS.prettysize(stats["rxb"])
|
||
txb_str = "↑"+RNS.prettysize(stats["txb"])
|
||
strdiff = len(rxb_str)-len(txb_str)
|
||
if strdiff > 0:
|
||
txb_str += " "*strdiff
|
||
elif strdiff < 0:
|
||
rxb_str += " "*-strdiff
|
||
|
||
rxstat = rxb_str+" "+RNS.prettyspeed(stats["rxs"])
|
||
txstat = txb_str+" "+RNS.prettyspeed(stats["txs"])
|
||
print(f"\n Totals : {txstat}\n {rxstat}")
|
||
|
||
if "transport_id" in stats and stats["transport_id"] != None:
|
||
print("\n Transport Instance "+RNS.prettyhexrep(stats["transport_id"])+" running")
|
||
if "network_id" in stats and stats["network_id"] != None:
|
||
print(" Network Identity "+RNS.prettyhexrep(stats["network_id"]))
|
||
if "probe_responder" in stats and stats["probe_responder"] != None:
|
||
print(" Probe responder at "+RNS.prettyhexrep(stats["probe_responder"])+ " active")
|
||
if "transport_uptime" in stats and stats["transport_uptime"] != None:
|
||
print(" Uptime is "+RNS.prettytime(stats["transport_uptime"])+lstr)
|
||
else:
|
||
if lstr != "":
|
||
print(f"\n{lstr}")
|
||
|
||
print("")
|
||
|
||
else:
|
||
if not remote:
|
||
print("Could not get RNS status")
|
||
else:
|
||
print("Could not get RNS status from remote transport instance "+RNS.prettyhexrep(identity_hash))
|
||
if must_exit:
|
||
exit(2)
|
||
else:
|
||
return
|
||
|
||
def main(must_exit=True, rns_instance=None):
|
||
try:
|
||
parser = argparse.ArgumentParser(description="Reticulum Network Stack Status")
|
||
parser.add_argument("--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str)
|
||
parser.add_argument("--version", action="version", version="rnstatus {version}".format(version=__version__))
|
||
|
||
parser.add_argument("-a", "--all", action="store_true", help="show all interfaces", default=False)
|
||
parser.add_argument("-A", "--announce-stats", action="store_true", help="show announce stats", default=False)
|
||
parser.add_argument("-l", "--link-stats", action="store_true", help="show link stats", default=False)
|
||
parser.add_argument("-t", "--totals", action="store_true", help="display traffic totals", default=False)
|
||
parser.add_argument("-s", "--sort", action="store", help="sort interfaces by [rate, traffic, rx, tx, rxs, txs, announces, arx, atx, held]", default=None, type=str)
|
||
parser.add_argument("-r", "--reverse", action="store_true", help="reverse sorting", default=False)
|
||
parser.add_argument("-j", "--json", action="store_true", help="output in JSON format", default=False)
|
||
parser.add_argument("-R", action="store", metavar="hash", help="transport identity hash of remote instance to get status from", default=None, type=str)
|
||
parser.add_argument("-i", action="store", metavar="path", help="path to identity used for remote management", default=None, type=str)
|
||
parser.add_argument("-w", action="store", metavar="seconds", type=float, help="timeout before giving up on remote queries", default=RNS.Transport.PATH_REQUEST_TIMEOUT)
|
||
parser.add_argument("-d", "--discovered", action="store_true", help="list discovered interfaces", default=False)
|
||
parser.add_argument("-D", action="store_true", help="show details and config entries for discovered interfaces", default=False)
|
||
parser.add_argument("-m", "--monitor", action="store_true", help="continuously monitor status", default=False)
|
||
parser.add_argument("-I", "--monitor-interval", action="store", metavar="seconds", type=float, help="refresh interval for monitor mode (default: 1)", default=1.0)
|
||
parser.add_argument('-v', '--verbose', action='count', default=0)
|
||
parser.add_argument("filter", nargs="?", default=None, help="only display interfaces with names including filter", type=str)
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.config: configarg = args.config
|
||
else: configarg = None
|
||
|
||
if args.monitor:
|
||
if args.R: require_shared = False
|
||
else: require_shared = True
|
||
|
||
try: reticulum = RNS.Reticulum(configdir=configarg, loglevel=3+args.verbose, require_shared_instance=require_shared)
|
||
except Exception as e:
|
||
print("No shared RNS instance available to get status from")
|
||
exit(1)
|
||
|
||
while True:
|
||
buffer = io.StringIO()
|
||
old_stdout = sys.stdout
|
||
sys.stdout = buffer
|
||
|
||
try:
|
||
program_setup(configdir = configarg, dispall = args.all, verbosity=args.verbose, name_filter=args.filter, json=args.json,
|
||
astats=args.announce_stats, lstats=args.link_stats, sorting=args.sort, sort_reverse=args.reverse, remote=args.R,
|
||
management_identity=args.i, remote_timeout=args.w, must_exit=False, rns_instance=reticulum, traffic_totals=args.totals,
|
||
discovered_interfaces=args.discovered, config_entries=args.D)
|
||
|
||
finally:
|
||
sys.stdout = old_stdout
|
||
|
||
output = buffer.getvalue()
|
||
print("\033[H\033[2J", end="")
|
||
print(output, end="", flush=True)
|
||
|
||
time.sleep(args.monitor_interval)
|
||
|
||
else:
|
||
program_setup(configdir = configarg, dispall = args.all, verbosity=args.verbose, name_filter=args.filter, json=args.json,
|
||
astats=args.announce_stats, lstats=args.link_stats, sorting=args.sort, sort_reverse=args.reverse, remote=args.R,
|
||
management_identity=args.i, remote_timeout=args.w, must_exit=must_exit, rns_instance=rns_instance, traffic_totals=args.totals,
|
||
discovered_interfaces=args.discovered, config_entries=args.D)
|
||
|
||
except KeyboardInterrupt:
|
||
print("")
|
||
if must_exit: exit()
|
||
else: return
|
||
|
||
def speed_str(num, suffix='bps'):
|
||
units = ['','k','M','G','T','P','E','Z']
|
||
last_unit = 'Y'
|
||
|
||
if suffix == 'Bps':
|
||
num /= 8
|
||
units = ['','K','M','G','T','P','E','Z']
|
||
last_unit = 'Y'
|
||
|
||
for unit in units:
|
||
if abs(num) < 1000.0:
|
||
return "%3.2f %s%s" % (num, unit, suffix)
|
||
num /= 1000.0
|
||
|
||
return "%.2f %s%s" % (num, last_unit, suffix)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|