Updated WeaveInterface. Added support for Weave devices to rnstatus.

This commit is contained in:
Mark Qvist 2025-10-29 12:43:20 +01:00
commit ddf14e5636
3 changed files with 278 additions and 69 deletions

View file

@ -1,3 +1,33 @@
# Reticulum License
#
# Copyright (c) 2016-2025 Mark Qvist
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# - The Software shall not be used in any kind of system which includes amongst
# its functions the ability to purposefully do harm to human beings.
#
# - The Software shall not be used, directly or indirectly, in the creation of
# an artificial intelligence, machine learning or language model training
# dataset, including but not limited to any use that contributes to the
# training or development of such a model or algorithm.
#
# - The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import RNS
import threading
import time
@ -17,30 +47,54 @@ class HDLC():
return data
class WDCL():
WDCL_T_DISCOVER = 0x00
WDCL_T_CONNECT = 0x01
WDCL_T_CMD = 0x02
WDCL_T_LOG = 0x03
WDCL_T_DISP = 0x04
WDCL_T_ENDPOINT_PKT = 0x05
WDCL_T_ENCAP_PROTO = 0x06
WDCL_T_DISCOVER = 0x00
WDCL_T_CONNECT = 0x01
WDCL_T_CMD = 0x02
WDCL_T_LOG = 0x03
WDCL_T_DISP = 0x04
WDCL_T_ENDPOINT_PKT = 0x05
WDCL_T_ENCAP_PROTO = 0x06
WDCL_BROADCAST = bytes([0xFF, 0xFF, 0xFF, 0xFF])
WDCL_BROADCAST = bytes([0xFF, 0xFF, 0xFF, 0xFF])
HEADER_MINSIZE = 4+1
HW_MTU = 1500
MAX_CHUNK = 32768
port = None
speed = None
databits = None
parity = None
stopbits = None
serial = None
WDCL_HANDSHAKE_TIMEOUT = 2
HEADER_MINSIZE = 4+1
MAX_CHUNK = 32768
port = None
speed = None
databits = None
parity = None
stopbits = None
serial = None
def __init__(self, owner, device, port, as_interface=False):
import importlib.util
if importlib.util.find_spec('serial') != None: import serial
else: RNS.panic()
if RNS.vendor.platformutils.is_android():
self.on_android = True
if importlib.util.find_spec('usbserial4a') != None:
from usbserial4a import serial4a as serial
parity = "N"
if importlib.util.find_spec('jnius') == None:
RNS.log("Could not load jnius API wrapper for Android, RNode interface cannot be created.", RNS.LOG_CRITICAL)
RNS.log("This probably means you are trying to use an USB-based interface from within Termux or similar.", RNS.LOG_CRITICAL)
RNS.log("This is currently not possible, due to this environment limiting access to the native Android APIs.", RNS.LOG_CRITICAL)
RNS.panic()
else:
RNS.log("Could not load USB serial module for Android, Weave interface cannot be created.", RNS.LOG_CRITICAL)
RNS.panic()
else:
self.on_android = False
if importlib.util.find_spec('serial') != None:
import serial
parity = serial.PARITY_NONE
else:
RNS.log("Using the Weave interface requires a serial communication module to be installed.", RNS.LOG_CRITICAL)
RNS.log("You can install one with the command: python3 -m pip install pyserial", RNS.LOG_CRITICAL)
RNS.panic()
if port == None: raise ValueError("No port specified")
@ -59,7 +113,7 @@ class WDCL():
self.port = port
self.speed = 3000000
self.databits = 8
self.parity = serial.PARITY_NONE
self.parity = parity
self.stopbits = 1
self.timeout = 100
self.online = False
@ -67,6 +121,8 @@ class WDCL():
self.next_tx = 0
self.should_run = True
self.receiver = None
self.wdcl_connected = False
self.reconnecting = False
self.frame_queue = deque()
if not self.as_interface:
self.id = RNS.Identity.full_hash(port.hwid.encode("utf-8"))
@ -82,7 +138,7 @@ class WDCL():
RNS.log("Could not open serial port for interface "+str(self), RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.log("Reticulum will attempt to bring up this interface periodically", RNS.LOG_ERROR)
if not self.detached and not self.reconnecting:
if not self.owner.detached and not self.reconnecting:
thread = threading.Thread(target=self.reconnect_port)
thread.daemon = True
thread.start()
@ -98,42 +154,112 @@ class WDCL():
def open_port(self):
if self.as_interface:
RNS.log(f"Opening serial port {self.port}...", RNS.LOG_VERBOSE)
target_port = self.port
else:
self.owner.wlog(f"Opening serial port {self.port.device}...")
target_port = self.port.device
if not self.on_android:
if self.as_interface:
RNS.log(f"Opening serial port {self.port}...", RNS.LOG_VERBOSE)
target_port = self.port
else:
self.owner.wlog(f"Opening serial port {self.port.device}...")
target_port = self.port.device
self.serial = self.pyserial.Serial(
port = target_port,
baudrate = self.speed,
bytesize = self.databits,
parity = self.parity,
stopbits = self.stopbits,
xonxoff = False,
rtscts = False,
timeout = 0.250,
inter_byte_timeout = None,
write_timeout = None,
dsrdtr = False)
self.serial = self.pyserial.Serial(
port = target_port,
baudrate = self.speed,
bytesize = self.databits,
parity = self.parity,
stopbits = self.stopbits,
xonxoff = False,
rtscts = False,
timeout = 0.250,
inter_byte_timeout = None,
write_timeout = None,
dsrdtr = False)
else:
# Get device parameters
from usb4a import usb
device = usb.get_usb_device(self.port)
if device:
vid = device.getVendorId()
pid = device.getProductId()
# Driver overrides for speficic chips
proxy = self.pyserial.get_serial_port
if vid == 0x1A86 and pid == 0x55D4:
# Force CDC driver for Qinheng CH34x
RNS.log(str(self)+" using CDC driver for "+RNS.hexrep(vid)+":"+RNS.hexrep(pid), RNS.LOG_DEBUG)
from usbserial4a.cdcacmserial4a import CdcAcmSerial
proxy = CdcAcmSerial
self.serial = proxy(
self.port,
baudrate = self.speed,
bytesize = self.databits,
parity = self.parity,
stopbits = self.stopbits,
xonxoff = False,
rtscts = False,
timeout = None,
inter_byte_timeout = None,
# write_timeout = wtimeout,
dsrdtr = False,
)
if vid == 0x0403:
# Hardware parameters for FTDI devices @ 115200 baud
self.serial.DEFAULT_READ_BUFFER_SIZE = 16 * 1024
self.serial.USB_READ_TIMEOUT_MILLIS = 100
self.serial.timeout = 0.1
elif vid == 0x10C4:
# Hardware parameters for SiLabs CP210x @ 115200 baud
self.serial.DEFAULT_READ_BUFFER_SIZE = 64
self.serial.USB_READ_TIMEOUT_MILLIS = 12
self.serial.timeout = 0.012
elif vid == 0x1A86 and pid == 0x55D4:
# Hardware parameters for Qinheng CH34x @ 115200 baud
self.serial.DEFAULT_READ_BUFFER_SIZE = 64
self.serial.USB_READ_TIMEOUT_MILLIS = 12
self.serial.timeout = 0.1
else:
# Default values
self.serial.DEFAULT_READ_BUFFER_SIZE = 1 * 1024
self.serial.USB_READ_TIMEOUT_MILLIS = 100
self.serial.timeout = 0.1
RNS.log(str(self)+" USB read buffer size set to "+RNS.prettysize(self.serial.DEFAULT_READ_BUFFER_SIZE), RNS.LOG_DEBUG)
RNS.log(str(self)+" USB read timeout set to "+str(self.serial.USB_READ_TIMEOUT_MILLIS)+"ms", RNS.LOG_DEBUG)
RNS.log(str(self)+" USB write timeout set to "+str(self.serial.USB_WRITE_TIMEOUT_MILLIS)+"ms", RNS.LOG_DEBUG)
def close(self):
self.should_run = False
self.online = False
self.wdcl_connected = False
if self.serial:
self.serial.close()
if self.as_interface: RNS.LOG((f"Closed serial port {str(self.port)} for {str(self)}"), RNS.LOG_VERBOSE)
if self.as_interface: RNS.log((f"Closed serial port {str(self.port)} for {str(self)}"), RNS.LOG_VERBOSE)
else: self.owner.wlog(f"Closed serial port {str(self.port.device)} for {str(self)}")
def configure_device(self):
thread = threading.Thread(target=self.read_loop)
thread.daemon = True
thread.start()
self.online = True
if self.as_interface: RNS.log(f"Serial port {self.port} is now open, discovering remote device...", RNS.LOG_VERBOSE)
else: self.owner.wlog("Serial port "+self.port.device+" is now open")
self.device.discover()
if self.as_interface:
timeout = time.time() + self.WDCL_HANDSHAKE_TIMEOUT
while time.time() < timeout and not self.wdcl_connected: time.sleep(0.1)
if not self.wdcl_connected:
raise IOError(f"WDCL connection handshake timed out for {self}")
self.online = False
self.wdcl_connected = False
if self.serial:
try: self.serial.close()
except Exception as e: RNS.log("Error while cleaning serial connection: {e}", RNS.LOG_ERROR)
self.online = True
def process_incoming(self, data):
self.rxb += len(data)
if self.device:
@ -142,7 +268,7 @@ class WDCL():
else: self.frame_queue.append(data)
def process_outgoing(self, data):
if self.online:
if self.serial.is_open:
data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG])
written = self.serial.write(data)
self.txb += len(data)
@ -176,6 +302,7 @@ class WDCL():
except Exception as e:
self.online = False
self.wdcl_connected = False
if self.should_run:
if self.as_interface:
RNS.log("A serial port error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR)
@ -185,12 +312,18 @@ class WDCL():
self.owner.wlog("Will attempt to reconnect the interface periodically.")
RNS.trace_exception(e)
RNS.log("READ LOOP EXIT")
self.online = False
self.wdcl_connected = False
try: self.serial.close()
except: pass
if self.should_run: self.reconnect_port()
def reconnect_port(self):
if self.reconnecting: return
self.reconnecting = True
self.wdcl_connected = False
while not self.online:
try:
time.sleep(5)
@ -202,6 +335,7 @@ class WDCL():
if self.as_interface: RNS.log("Error while reconnecting port, the contained exception was: "+str(e), RNS.LOG_ERROR)
else: self.owner.wlog("Error while reconnecting port, the contained exception was: "+str(e))
self.reconnecting = False
if self.as_interface: RNS.log("Reconnected serial port for "+str(self), RNS.LOG_INFO)
else: self.owner.wlog("Reconnected serial port for "+str(self))
@ -255,6 +389,7 @@ class Evt():
ET_PROTO_WDCL_INIT = 0x3000
ET_PROTO_WDCL_RUNNING = 0x3001
ET_PROTO_WDCL_CONNECTION = 0x3002
ET_PROTO_WDCL_HOST_ENDPOINT = 0x3003
ET_PROTO_WEAVE_INIT = 0x3100
ET_PROTO_WEAVE_RUNNING = 0x3101
ET_PROTO_WEAVE_EP_ALIVE = 0x3102
@ -320,6 +455,7 @@ class Evt():
ET_PROTO_WDCL_INIT: "WDCL protocol initialization",
ET_PROTO_WDCL_RUNNING: "WDCL protocol activation",
ET_PROTO_WDCL_CONNECTION: "WDCL host connection",
ET_PROTO_WDCL_HOST_ENDPOINT: "Weave host endpoint",
ET_PROTO_WEAVE_INIT: "Weave protocol initialization",
ET_PROTO_WEAVE_RUNNING: "Weave protocol activation",
ET_PROTO_WEAVE_EP_ALIVE: "Weave endpoint appeared",
@ -436,23 +572,24 @@ class WeaveEndpoint():
self.received.append(data)
class WeaveDevice():
STATLEN_MAX = 120
STAT_UPDATE_THROTTLE = 0.5
STATLEN_MAX = 120
STAT_UPDATE_THROTTLE = 0.5
WEAVE_SWITCH_ID_LEN = 4
WEAVE_ENDPOINT_ID_LEN = 8
WEAVE_FLOWSEQ_LEN = 2
WEAVE_HMAC_LEN = 8
WEAVE_AUTH_LEN = WEAVE_ENDPOINT_ID_LEN+WEAVE_HMAC_LEN
WEAVE_SWITCH_ID_LEN = 4
WEAVE_ENDPOINT_ID_LEN = 8
WEAVE_FLOWSEQ_LEN = 2
WEAVE_HMAC_LEN = 8
WEAVE_AUTH_LEN = WEAVE_ENDPOINT_ID_LEN+WEAVE_HMAC_LEN
WEAVE_PUBKEY_SIZE = 32
WEAVE_PRVKEY_SIZE = 64
WEAVE_SIGNATURE_LEN = 64
WEAVE_PUBKEY_SIZE = 32
WEAVE_PRVKEY_SIZE = 64
WEAVE_SIGNATURE_LEN = 64
def __init__(self, as_interface=False, rns_interface=None):
self.identity = None
self.receiver = None
self.switch_id = None
self.endpoint_id = None
self.owner = None
self.rns_interface = rns_interface
self.as_interface = as_interface
@ -507,7 +644,8 @@ class WeaveDevice():
data = self.connection.switch_pub_bytes
data += signature
self.wdcl_send(WDCL.WDCL_T_CONNECT, data)
if not self.as_interface: self.receiver.log("Connection handshake sent")
if self.as_interface: RNS.log(f"WDCL connection handshake sent", RNS.LOG_VERBOSE)
else: self.receiver.log("Connection handshake sent")
def capture_stats_cpu(self):
self.cpu_stats.append({"timestamp": time.time(), "cpu_load": self.cpu_load})
@ -620,6 +758,8 @@ class WeaveDevice():
def log_handle(self, frame):
# Handle system event signalling
if frame.event == Evt.ET_PROTO_WDCL_CONNECTION: self.connection.wdcl_connected = True
if frame.event == Evt.ET_PROTO_WDCL_HOST_ENDPOINT and len(frame.data) == self.WEAVE_ENDPOINT_ID_LEN: self.endpoint_id = frame.data
if frame.event == Evt.ET_PROTO_WEAVE_EP_ALIVE and len(frame.data) == 8:
self.endpoint_alive(frame.data)
elif frame.event == Evt.ET_STAT_TASK_CPU: self.active_tasks[frame.data[1:].decode("utf-8")] = { "cpu_load": frame.data[0], "timestamp": time.time() }
@ -694,6 +834,26 @@ class WeaveInterface(Interface):
MULTI_IF_DEQUE_LEN = 48
MULTI_IF_DEQUE_TTL = 0.75
@property
def cpu_load(self):
if not self.device: return None
else: return self.device.cpu_load
@property
def mem_load(self):
if not self.device: return None
else: return self.device.memory_used_pct
@property
def switch_id(self):
if not self.device: return None
else: return self.device.switch_id
@property
def endpoint_id(self):
if not self.device: return None
else: return self.device.endpoint_id
def __init__(self, owner, configuration):
c = Interface.get_config_obj(configuration)
name = c["name"]
@ -710,10 +870,8 @@ class WeaveInterface(Interface):
self.name = name
self.port = port
self.switch_identity = RNS.Identity()
self.device = WeaveDevice(as_interface=True, rns_interface=self)
self.connection = WDCL(owner=self, device=self.device, port=self.port, as_interface=True)
self.owner = owner
self.online = False
self._online = False
self.final_init_done = False
self.peers = {}
self.timed_out_interfaces = {}
@ -730,14 +888,15 @@ class WeaveInterface(Interface):
if configured_bitrate != None: self.bitrate = configured_bitrate
else: self.bitrate = WeaveInterface.BITRATE_GUESS
self.final_init()
def final_init(self):
self.device = WeaveDevice(as_interface=True, rns_interface=self)
self.connection = WDCL(owner=self, device=self.device, port=self.port, as_interface=True)
job_thread = threading.Thread(target=self.peer_jobs)
job_thread.daemon = True
job_thread.start()
self.online = True
self._online = True
self.final_init_done = True
def peer_jobs(self):
@ -760,7 +919,7 @@ class WeaveInterface(Interface):
spawned_interface = self.spawned_interfaces[peer_addr]
spawned_interface.detach()
spawned_interface.teardown()
RNS.log(str(self)+" removed peer "+str(peer_addr)+" on "+str(removed_peer[0]), RNS.LOG_DEBUG)
RNS.log(str(self)+" removed peer "+RNS.hexrep(peer_addr)+" on "+RNS.hexrep(removed_peer[0]), RNS.LOG_DEBUG)
@property
def peer_count(self):
@ -801,7 +960,7 @@ class WeaveInterface(Interface):
spawned_interface.announce_rate_penalty = self.announce_rate_penalty
spawned_interface.mode = self.mode
spawned_interface.HW_MTU = self.HW_MTU
spawned_interface.online = True
spawned_interface._online = True
RNS.Transport.interfaces.append(spawned_interface)
if endpoint_addr in self.spawned_interfaces:
self.spawned_interfaces[endpoint_addr].detach()
@ -830,8 +989,17 @@ class WeaveInterface(Interface):
return False
def detach(self):
self.online = False
self._online = False
@property
def online(self):
if not self._online: return False
else: return self.connection.online
@online.setter
def online(self, value):
self._online = value
def __str__(self):
return "WeaveInterface["+self.name+"]"
@ -846,12 +1014,22 @@ class WeaveInterfacePeer(Interface):
self.addr_info = None
self.HW_MTU = self.owner.HW_MTU
self.FIXED_MTU = self.owner.FIXED_MTU
self._online = False
def __str__(self):
return f"WeaveInterfacePeer[{RNS.hexrep(self.endpoint_addr)}]"
@property
def online(self):
if not self._online or not self.owner: return false
else: return self.owner.online
@online.setter
def online(self, value):
self._online = value
def process_incoming(self, data, endpoint_addr=None):
if self.online and self.owner.online:
if self.online:
data_hash = RNS.Identity.full_hash(data)
deque_hit = False
if data_hash in self.owner.mif_deque:
@ -879,7 +1057,7 @@ class WeaveInterfacePeer(Interface):
RNS.log("Could not transmit on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
def detach(self):
self.online = False
self._online = False
self.detached = True
def teardown(self):
@ -888,10 +1066,9 @@ class WeaveInterfacePeer(Interface):
if RNS.Reticulum.panic_on_interface_error:
RNS.panic()
else:
RNS.log("The interface "+str(self)+" is being torn down.", RNS.LOG_VERBOSE)
else: RNS.log("The interface "+str(self)+" is being torn down.", RNS.LOG_VERBOSE)
self.online = False
self._online = False
self.OUT = False
self.IN = False

View file

@ -805,6 +805,7 @@ class Reticulum:
except Exception as e:
RNS.log("The interface \""+name+"\" could not be created. Check your configuration file for errors!", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e)
RNS.panic()
else:
RNS.log("The interface name \""+name+"\" was already used. Check your configuration file for errors!", RNS.LOG_ERROR)
@ -1026,6 +1027,20 @@ class Reticulum:
if hasattr(interface, "r_noise_floor"):
ifstats["noise_floor"] = interface.r_noise_floor
if hasattr(interface, "cpu_load"):
ifstats["cpu_load"] = interface.cpu_load
if hasattr(interface, "mem_load"):
ifstats["mem_load"] = interface.mem_load
if hasattr(interface, "switch_id"):
if interface.switch_id != None: ifstats["switch_id"] = RNS.hexrep(interface.switch_id)
else: ifstats["switch_id"] = None
if hasattr(interface, "endpoint_id"):
if interface.endpoint_id != None: ifstats["endpoint_id"] = RNS.hexrep(interface.endpoint_id)
else: ifstats["endpoint_id"] = None
if hasattr(interface, "r_battery_state"):
if interface.r_battery_state != 0x00:
ifstats["battery_state"] = interface.get_battery_state_string()

View file

@ -263,6 +263,7 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
name.startswith("TCPInterface[Client") or
name.startswith("BackboneInterface[Client on") or
name.startswith("AutoInterfacePeer[") or
name.startswith("WeaveInterfacePeer[") or
name.startswith("I2PInterfacePeer[Connected peer") or
(name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False))
):
@ -339,6 +340,14 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
else:
print(" Noise Fl. : Unknown")
if "cpu_load" in ifstat:
if ifstat["cpu_load"] != None: print(" CPU load : {v} %".format(v=str(ifstat["cpu_load"])))
else: print(" CPU load : Unknown")
if "mem_load" in ifstat:
if ifstat["cpu_load"] != None: print(" Mem usage : {v} %".format(v=str(ifstat["mem_load"])))
else: print(" Mem usage : Unknown")
if "battery_percent" in ifstat and ifstat["battery_percent"] != None:
try:
bpi = int(ifstat["battery_percent"])
@ -353,6 +362,14 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
if "channel_load_short" in ifstat and "channel_load_long" in ifstat:
print(" Ch. Load : {ats}% (15s), {atl}% (1h)".format(ats=str(ifstat["channel_load_short"]),atl=str(ifstat["channel_load_long"])))
if "switch_id" in ifstat:
if ifstat["switch_id"] != None: print(" Switch ID : {v}".format(v=str(ifstat["switch_id"])))
else: print(" Switch ID : Unknown")
if "endpoint_id" in ifstat:
if ifstat["endpoint_id"] != None: print(" Endpoint : {v}".format(v=str(ifstat["endpoint_id"])))
else: print(" Endpoint : Unknown")
if "peers" in ifstat and ifstat["peers"] != None:
print(" Peers : {np} reachable".format(np=ifstat["peers"]))