markqvist___Reticulum/RNS/Utilities/rnstatus.py

687 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# Reticulum License
#
# Copyright (c) 2016-2025 Mark Qvist
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# - The Software shall not be used in any kind of system which includes amongst
# its functions the ability to purposefully do harm to human beings.
#
# - The Software shall not be used, directly or indirectly, in the creation of
# an artificial intelligence, machine learning or language model training
# dataset, including but not limited to any use that contributes to the
# training or development of such a model or algorithm.
#
# - The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import RNS
import os
import sys
import time
import argparse
import io
from RNS._version import __version__
def size_str(num, suffix='B'):
units = ['','K','M','G','T','P','E','Z']
last_unit = 'Y'
if suffix == 'b':
num *= 8
units = ['','K','M','G','T','P','E','Z']
last_unit = 'Y'
for unit in units:
if abs(num) < 1000.0:
if unit == "":
return "%.0f %s%s" % (num, unit, suffix)
else:
return "%.2f %s%s" % (num, unit, suffix)
num /= 1000.0
return "%.2f%s%s" % (num, last_unit, suffix)
request_result = None
request_concluded = False
def get_remote_status(destination_hash, include_lstats, identity, no_output=False, timeout=RNS.Transport.PATH_REQUEST_TIMEOUT):
global request_result, request_concluded
link_count = None
if not RNS.Transport.has_path(destination_hash):
if not no_output:
print("Path to "+RNS.prettyhexrep(destination_hash)+" requested", end=" ")
sys.stdout.flush()
RNS.Transport.request_path(destination_hash)
pr_time = time.time()
while not RNS.Transport.has_path(destination_hash):
time.sleep(0.1)
if time.time() - pr_time > timeout:
if not no_output:
print("\r \r", end="")
print("Path request timed out")
exit(12)
remote_identity = RNS.Identity.recall(destination_hash)
def remote_link_closed(link):
if link.teardown_reason == RNS.Link.TIMEOUT:
if not no_output:
print("\r \r", end="")
print("The link timed out, exiting now")
elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED:
if not no_output:
print("\r \r", end="")
print("The link was closed by the server, exiting now")
else:
if not no_output:
print("\r \r", end="")
print("Link closed unexpectedly, exiting now")
exit(10)
def request_failed(request_receipt):
global request_result, request_concluded
if not no_output:
print("\r \r", end="")
print("The remote status request failed. Likely authentication failure.")
request_concluded = True
def got_response(request_receipt):
global request_result, request_concluded
response = request_receipt.response
if isinstance(response, list):
status = response[0]
if len(response) > 1:
link_count = response[1]
else:
link_count = None
request_result = (status, link_count)
request_concluded = True
def remote_link_established(link):
if not no_output:
print("\r \r", end="")
print("Sending request...", end=" ")
sys.stdout.flush()
link.identify(identity)
link.request("/status", data = [include_lstats], response_callback = got_response, failed_callback = request_failed)
if not no_output:
print("\r \r", end="")
print("Establishing link with remote transport instance...", end=" ")
sys.stdout.flush()
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "rnstransport", "remote", "management")
link = RNS.Link(remote_destination)
link.set_link_established_callback(remote_link_established)
link.set_link_closed_callback(remote_link_closed)
while not request_concluded:
time.sleep(0.1)
if request_result != None:
print("\r \r", end="")
return request_result
def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=False, astats=False, lstats=False, sorting=None, sort_reverse=False,
remote=None, management_identity=None, remote_timeout=RNS.Transport.PATH_REQUEST_TIMEOUT, must_exit=True, rns_instance=None,
traffic_totals=False, discovered_interfaces=False, config_entries=False):
if remote: require_shared = False
else: require_shared = True
try:
if rns_instance:
reticulum = rns_instance
must_exit = False
else:
reticulum = RNS.Reticulum(configdir=configdir, loglevel=3+verbosity, require_shared_instance=require_shared)
except Exception as e:
print("No shared RNS instance available to get status from")
if must_exit: exit(1)
else: return
link_count = None
stats = None
details = False
if config_entries:
discovered_interfaces = True
details = True
if discovered_interfaces:
if_discovery = RNS.Discovery.InterfaceDiscovery(discover_interfaces=False)
ifs = if_discovery.list_discovered_interfaces()
print("")
if json:
import json
for i in ifs:
for e in i:
if isinstance(i[e], bytes): i[e] = RNS.hexrep(i[e], delimit=False)
print(json.dumps(ifs))
else:
filtered_ifs = []
for i in ifs:
name = i["name"]
if not name_filter or name_filter.lower() in name.lower(): filtered_ifs.append(i)
if details:
for idx, i in enumerate(filtered_ifs):
try:
name = i["name"]
if_type = i["type"]
status = i["status"]
if status == "available": status_display = "Available"
elif status == "unknown": status_display = "Unknown"
elif status == "stale": status_display = "Stale"
else: status_display = status
now = time.time()
dago = now-i["discovered"]
hago = now-i["last_heard"]
discovered_display = f"{RNS.prettytime(dago, compact=True)} ago"
last_heard_display = f"{RNS.prettytime(hago, compact=True)} ago"
transport_str = "Enabled" if i["transport"] else "Disabled"
if i["latitude"] is not None and i["longitude"] is not None:
lat = round(i["latitude"], 4)
lon = round(i["longitude"], 4)
if i["height"] != None: height = ", "+str(i["height"])+"m h"
else: height = ""
location = f"{lat}, {lon}{height}"
else: location = "Unknown"
transport_id = None
network = None
if "transport_id" in i: transport_id = i["transport_id"]
if "transport_id" in i and "network_id" in i and i["transport_id"] != i["network_id"]:
network = i["network_id"]
if idx > 0: print("\n"+"="*32+"\n")
if network: print(f"Network ID : {network}")
if transport_id: print(f"Transport ID : {transport_id}")
print(f"Name : {name}")
print(f"Type : {if_type}")
print(f"Status : {status_display}")
print(f"Transport : {transport_str}")
print(f"Distance : {i['hops']} hop{'' if i['hops'] == 1 else 's'}")
print(f"Discovered : {discovered_display}")
print(f"Last Heard : {last_heard_display}")
print(f"Location : {location}")
if "frequency" in i: print(f"Frequency : {i['frequency']:,} Hz")
if "bandwidth" in i: print(f"Bandwidth : {i['bandwidth']:,} Hz")
if "sf" in i: print(f"Sprd. Factor : {i['sf']}")
if "cr" in i: print(f"Coding Rate : {i['cr']}")
if "modulation" in i: print(f"Modulation : {i['modulation']}")
if "reachable_on" in i: print(f"Address : {i['reachable_on']}")
if "port" in i: print(f"Port : {i['port']}")
print(f"Stamp Value : {i['value']}")
print(f"\nConfiguration Entry:")
config_lines = i["config_entry"].split('\n')
for line in config_lines: print(f" {line}")
except Exception as e:
pass
else:
print(f"{'Name':<25} {'Type':<12} {'Status':<12} {'Last Heard':<12} {'Value':<8} {'Location':<15}")
print("-" * 89)
for i in filtered_ifs:
try:
name = i["name"][:24] + "" if len(i["name"]) > 24 else i["name"]
if_type = i["type"].replace("Interface", "")
status = i["status"]
if status == "available": status_display = "✓ Available"
elif status == "unknown": status_display = "? Unknown"
elif status == "stale": status_display = "× Stale"
else: status_display = status
now = time.time()
last_heard = i["last_heard"]
diff = now - last_heard
if diff < 60: last_heard_display = "Just now"
elif diff < 3600:
mins = int(diff / 60)
last_heard_display = f"{mins}m ago"
elif diff < 86400:
hours = int(diff / 3600)
last_heard_display = f"{hours}h ago"
else:
days = int(diff / 86400)
last_heard_display = f"{days}d ago"
value = str(i["value"])
if i["latitude"] is not None and i["longitude"] is not None:
lat = round(i["latitude"], 4)
lon = round(i["longitude"], 4)
location = f"{lat}, {lon}"
else: location = "N/A"
print(f"{name:<25} {if_type:<12} {status_display:<12} {last_heard_display:<12} {value:<8} {location:<15}")
except Exception as e:
pass
if must_exit: exit(0)
else: return
if remote:
try:
if management_identity is None:
raise ValueError("Remote management requires an identity file. Use -i to specify the path to a management identity.")
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
if len(remote) != dest_len:
raise ValueError("Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2))
try:
identity_hash = bytes.fromhex(remote)
destination_hash = RNS.Destination.hash_from_name_and_identity("rnstransport.remote.management", identity_hash)
except Exception as e:
raise ValueError("Invalid destination entered. Check your input.")
identity = RNS.Identity.from_file(os.path.expanduser(management_identity))
if identity == None:
raise ValueError("Could not load management identity from "+str(management_identity))
try:
remote_status = get_remote_status(destination_hash, lstats, identity, no_output=json, timeout=remote_timeout)
if remote_status != None:
stats, link_count = remote_status
except Exception as e:
raise e
except Exception as e:
print(str(e))
if must_exit: exit(20)
else: return
else:
if lstats:
try: link_count = reticulum.get_link_count()
except Exception as e: pass
try: stats = reticulum.get_interface_stats()
except Exception as e: pass
if stats != None:
if json:
import json
for s in stats:
if isinstance(stats[s], bytes):
stats[s] = RNS.hexrep(stats[s], delimit=False)
if isinstance(stats[s], dict) or isinstance(stats[s], list):
for i in stats[s]:
if isinstance(i, dict):
for k in i:
if isinstance(i[k], bytes):
i[k] = RNS.hexrep(i[k], delimit=False)
print(json.dumps(stats))
if must_exit: exit()
else: return
interfaces = stats["interfaces"]
if sorting != None and isinstance(sorting, str):
sorting = sorting.lower()
if sorting == "rate" or sorting == "bitrate":
interfaces.sort(key=lambda i: i["bitrate"], reverse=not sort_reverse)
if sorting == "rx":
interfaces.sort(key=lambda i: i["rxb"], reverse=not sort_reverse)
if sorting == "tx":
interfaces.sort(key=lambda i: i["txb"], reverse=not sort_reverse)
if sorting == "rxs":
interfaces.sort(key=lambda i: i["rxs"], reverse=not sort_reverse)
if sorting == "txs":
interfaces.sort(key=lambda i: i["txs"], reverse=not sort_reverse)
if sorting == "traffic":
interfaces.sort(key=lambda i: i["rxb"]+i["txb"], reverse=not sort_reverse)
if sorting == "announces" or sorting == "announce":
interfaces.sort(key=lambda i: i["incoming_announce_frequency"]+i["outgoing_announce_frequency"], reverse=not sort_reverse)
if sorting == "arx":
interfaces.sort(key=lambda i: i["incoming_announce_frequency"], reverse=not sort_reverse)
if sorting == "atx":
interfaces.sort(key=lambda i: i["outgoing_announce_frequency"], reverse=not sort_reverse)
if sorting == "held":
interfaces.sort(key=lambda i: i["held_announces"], reverse=not sort_reverse)
for ifstat in interfaces:
name = ifstat["name"]
if dispall or not (
name.startswith("LocalInterface[") or
name.startswith("TCPInterface[Client") or
name.startswith("BackboneInterface[Client on") or
name.startswith("AutoInterfacePeer[") or
name.startswith("WeaveInterfacePeer[") or
name.startswith("I2PInterfacePeer[Connected peer") or
(name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False))
):
if not (name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False)):
if name_filter == None or name_filter.lower() in name.lower():
print("")
if ifstat["status"]: ss = "Up"
else: ss = "Down"
if ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: modestr = "Access Point"
elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_POINT_TO_POINT: modestr = "Point-to-Point"
elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_ROAMING: modestr = "Roaming"
elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: modestr = "Boundary"
elif ifstat["mode"] == RNS.Interfaces.Interface.Interface.MODE_GATEWAY: modestr = "Gateway"
else: modestr = "Full"
if ifstat["clients"] != None:
clients = ifstat["clients"]
if name.startswith("Shared Instance["):
cnum = max(clients-1,0)
if cnum == 1:
spec_str = " program"
else:
spec_str = " programs"
clients_string = "Serving : "+str(cnum)+spec_str
elif name.startswith("I2PInterface["):
if "i2p_connectable" in ifstat and ifstat["i2p_connectable"] == True:
cnum = clients
if cnum == 1:
spec_str = " connected I2P endpoint"
else:
spec_str = " connected I2P endpoints"
clients_string = "Peers : "+str(cnum)+spec_str
else:
clients_string = ""
else:
clients_string = "Clients : "+str(clients)
else:
clients = None
print(" {n}".format(n=ifstat["name"]))
if "autoconnect_source" in ifstat and ifstat["autoconnect_source"] != None:
print(" Source : Auto-connect via <{ns}>".format(ns=ifstat["autoconnect_source"]))
if "ifac_netname" in ifstat and ifstat["ifac_netname"] != None:
print(" Network : {nn}".format(nn=ifstat["ifac_netname"]))
print(" Status : {ss}".format(ss=ss))
if clients != None and clients_string != "":
print(" "+clients_string)
if not (name.startswith("Shared Instance[") or name.startswith("TCPInterface[Client") or name.startswith("LocalInterface[")):
print(" Mode : {mode}".format(mode=modestr))
if "bitrate" in ifstat and ifstat["bitrate"] != None:
print(" Rate : {ss}".format(ss=speed_str(ifstat["bitrate"])))
if "noise_floor" in ifstat:
if not "interference" in ifstat: nstr = ""
else:
nf = ifstat["interference"]
lstr = ", no interference"
if "interference_last_ts" in ifstat and "interference_last_dbm" in ifstat:
lago = time.time()-ifstat["interference_last_ts"]
ldbm = ifstat["interference_last_dbm"]
lstr = f"\n Intrfrnc. : {ldbm} dBm {RNS.prettytime(lago, compact=True)} ago"
nstr = f"\n Intrfrnc. : {nf} dBm" if nf else lstr
if ifstat["noise_floor"] != None: print(" Noise Fl. : {nfl} dBm{ntr}".format(nfl=str(ifstat["noise_floor"]), ntr=nstr))
else: print(" Noise Fl. : Unknown")
if "cpu_load" in ifstat:
if ifstat["cpu_load"] != None: print(" CPU load : {v} %".format(v=str(ifstat["cpu_load"])))
else: print(" CPU load : Unknown")
if "cpu_temp" in ifstat:
if ifstat["cpu_temp"] != None: print(" CPU temp : {v}°C".format(v=str(ifstat["cpu_temp"])))
else: print(" CPU load : Unknown")
if "mem_load" in ifstat:
if ifstat["cpu_load"] != None: print(" Mem usage : {v} %".format(v=str(ifstat["mem_load"])))
else: print(" Mem usage : Unknown")
if "battery_percent" in ifstat and ifstat["battery_percent"] != None:
try:
bpi = int(ifstat["battery_percent"])
bss = ifstat["battery_state"]
print(f" Battery : {bpi}% ({bss})")
except:
pass
if "airtime_short" in ifstat and "airtime_long" in ifstat:
print(" Airtime : {ats}% (15s), {atl}% (1h)".format(ats=str(ifstat["airtime_short"]),atl=str(ifstat["airtime_long"])))
if "channel_load_short" in ifstat and "channel_load_long" in ifstat:
print(" Ch. Load : {ats}% (15s), {atl}% (1h)".format(ats=str(ifstat["channel_load_short"]),atl=str(ifstat["channel_load_long"])))
if "switch_id" in ifstat:
if ifstat["switch_id"] != None: print(" Switch ID : {v}".format(v=str(ifstat["switch_id"])))
else: print(" Switch ID : Unknown")
if "endpoint_id" in ifstat:
if ifstat["endpoint_id"] != None: print(" Endpoint : {v}".format(v=str(ifstat["endpoint_id"])))
else: print(" Endpoint : Unknown")
if "via_switch_id" in ifstat:
if ifstat["via_switch_id"] != None: print(" Via : {v}".format(v=str(ifstat["via_switch_id"])))
else: print(" Via : Unknown")
if "peers" in ifstat and ifstat["peers"] != None:
print(" Peers : {np} reachable".format(np=ifstat["peers"]))
if "tunnelstate" in ifstat and ifstat["tunnelstate"] != None:
print(" I2P : {ts}".format(ts=ifstat["tunnelstate"]))
if "ifac_signature" in ifstat and ifstat["ifac_signature"] != None:
sigstr = "<…"+RNS.hexrep(ifstat["ifac_signature"][-5:], delimit=False)+">"
print(" Access : {nb}-bit IFAC by {sig}".format(nb=ifstat["ifac_size"]*8, sig=sigstr))
if "i2p_b32" in ifstat and ifstat["i2p_b32"] != None:
print(" I2P B32 : {ep}".format(ep=str(ifstat["i2p_b32"])))
if astats and "announce_queue" in ifstat and ifstat["announce_queue"] != None and ifstat["announce_queue"] > 0:
aqn = ifstat["announce_queue"]
if aqn == 1:
print(" Queued : {np} announce".format(np=aqn))
else:
print(" Queued : {np} announces".format(np=aqn))
if astats and "held_announces" in ifstat and ifstat["held_announces"] != None and ifstat["held_announces"] > 0:
aqn = ifstat["held_announces"]
if aqn == 1:
print(" Held : {np} announce".format(np=aqn))
else:
print(" Held : {np} announces".format(np=aqn))
if astats and "incoming_announce_frequency" in ifstat and ifstat["incoming_announce_frequency"] != None:
print(" Announces : {iaf}".format(iaf=RNS.prettyfrequency(ifstat["outgoing_announce_frequency"])))
print(" {iaf}".format(iaf=RNS.prettyfrequency(ifstat["incoming_announce_frequency"])))
rxb_str = ""+RNS.prettysize(ifstat["rxb"])
txb_str = ""+RNS.prettysize(ifstat["txb"])
strdiff = len(rxb_str)-len(txb_str)
if strdiff > 0:
txb_str += " "*strdiff
elif strdiff < 0:
rxb_str += " "*-strdiff
rxstat = rxb_str
txstat = txb_str
if "rxs" in ifstat and "txs" in ifstat:
rxstat += " "+RNS.prettyspeed(ifstat["rxs"])
txstat += " "+RNS.prettyspeed(ifstat["txs"])
print(f" Traffic : {txstat}\n {rxstat}")
lstr = ""
if link_count != None and lstats:
ms = "y" if link_count == 1 else "ies"
if "transport_id" in stats and stats["transport_id"] != None:
lstr = f", {link_count} entr{ms} in link table"
else:
lstr = f" {link_count} entr{ms} in link table"
if traffic_totals:
rxb_str = ""+RNS.prettysize(stats["rxb"])
txb_str = ""+RNS.prettysize(stats["txb"])
strdiff = len(rxb_str)-len(txb_str)
if strdiff > 0:
txb_str += " "*strdiff
elif strdiff < 0:
rxb_str += " "*-strdiff
rxstat = rxb_str+" "+RNS.prettyspeed(stats["rxs"])
txstat = txb_str+" "+RNS.prettyspeed(stats["txs"])
print(f"\n Totals : {txstat}\n {rxstat}")
if "transport_id" in stats and stats["transport_id"] != None:
print("\n Transport Instance "+RNS.prettyhexrep(stats["transport_id"])+" running")
if "network_id" in stats and stats["network_id"] != None:
print(" Network Identity "+RNS.prettyhexrep(stats["network_id"]))
if "probe_responder" in stats and stats["probe_responder"] != None:
print(" Probe responder at "+RNS.prettyhexrep(stats["probe_responder"])+ " active")
if "transport_uptime" in stats and stats["transport_uptime"] != None:
print(" Uptime is "+RNS.prettytime(stats["transport_uptime"])+lstr)
else:
if lstr != "":
print(f"\n{lstr}")
print("")
else:
if not remote:
print("Could not get RNS status")
else:
print("Could not get RNS status from remote transport instance "+RNS.prettyhexrep(identity_hash))
if must_exit:
exit(2)
else:
return
def main(must_exit=True, rns_instance=None):
try:
parser = argparse.ArgumentParser(description="Reticulum Network Stack Status")
parser.add_argument("--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str)
parser.add_argument("--version", action="version", version="rnstatus {version}".format(version=__version__))
parser.add_argument("-a", "--all", action="store_true", help="show all interfaces", default=False)
parser.add_argument("-A", "--announce-stats", action="store_true", help="show announce stats", default=False)
parser.add_argument("-l", "--link-stats", action="store_true", help="show link stats", default=False)
parser.add_argument("-t", "--totals", action="store_true", help="display traffic totals", default=False)
parser.add_argument("-s", "--sort", action="store", help="sort interfaces by [rate, traffic, rx, tx, rxs, txs, announces, arx, atx, held]", default=None, type=str)
parser.add_argument("-r", "--reverse", action="store_true", help="reverse sorting", default=False)
parser.add_argument("-j", "--json", action="store_true", help="output in JSON format", default=False)
parser.add_argument("-R", action="store", metavar="hash", help="transport identity hash of remote instance to get status from", default=None, type=str)
parser.add_argument("-i", action="store", metavar="path", help="path to identity used for remote management", default=None, type=str)
parser.add_argument("-w", action="store", metavar="seconds", type=float, help="timeout before giving up on remote queries", default=RNS.Transport.PATH_REQUEST_TIMEOUT)
parser.add_argument("-d", "--discovered", action="store_true", help="list discovered interfaces", default=False)
parser.add_argument("-D", action="store_true", help="show details and config entries for discovered interfaces", default=False)
parser.add_argument("-m", "--monitor", action="store_true", help="continuously monitor status", default=False)
parser.add_argument("-I", "--monitor-interval", action="store", metavar="seconds", type=float, help="refresh interval for monitor mode (default: 1)", default=1.0)
parser.add_argument('-v', '--verbose', action='count', default=0)
parser.add_argument("filter", nargs="?", default=None, help="only display interfaces with names including filter", type=str)
args = parser.parse_args()
if args.config: configarg = args.config
else: configarg = None
if args.monitor:
if args.R: require_shared = False
else: require_shared = True
try: reticulum = RNS.Reticulum(configdir=configarg, loglevel=3+args.verbose, require_shared_instance=require_shared)
except Exception as e:
print("No shared RNS instance available to get status from")
exit(1)
while True:
buffer = io.StringIO()
old_stdout = sys.stdout
sys.stdout = buffer
try:
program_setup(configdir = configarg, dispall = args.all, verbosity=args.verbose, name_filter=args.filter, json=args.json,
astats=args.announce_stats, lstats=args.link_stats, sorting=args.sort, sort_reverse=args.reverse, remote=args.R,
management_identity=args.i, remote_timeout=args.w, must_exit=False, rns_instance=reticulum, traffic_totals=args.totals,
discovered_interfaces=args.discovered, config_entries=args.D)
finally:
sys.stdout = old_stdout
output = buffer.getvalue()
print("\033[H\033[2J", end="")
print(output, end="", flush=True)
time.sleep(args.monitor_interval)
else:
program_setup(configdir = configarg, dispall = args.all, verbosity=args.verbose, name_filter=args.filter, json=args.json,
astats=args.announce_stats, lstats=args.link_stats, sorting=args.sort, sort_reverse=args.reverse, remote=args.R,
management_identity=args.i, remote_timeout=args.w, must_exit=must_exit, rns_instance=rns_instance, traffic_totals=args.totals,
discovered_interfaces=args.discovered, config_entries=args.D)
except KeyboardInterrupt:
print("")
if must_exit: exit()
else: return
def speed_str(num, suffix='bps'):
units = ['','k','M','G','T','P','E','Z']
last_unit = 'Y'
if suffix == 'Bps':
num /= 8
units = ['','K','M','G','T','P','E','Z']
last_unit = 'Y'
for unit in units:
if abs(num) < 1000.0:
return "%3.2f %s%s" % (num, unit, suffix)
num /= 1000.0
return "%.2f %s%s" % (num, last_unit, suffix)
if __name__ == "__main__":
main()