Implemented on-network global interface discovery

This commit is contained in:
Mark Qvist 2025-12-31 01:07:08 +01:00
commit 9d36ff48dd
9 changed files with 297 additions and 10 deletions

228
RNS/Discovery.py Normal file
View file

@ -0,0 +1,228 @@
import RNS
import time
import threading
from .vendor import umsgpack as msgpack
NAME = 0xFF
INTERFACE_TYPE = 0x00
REACHABLE_ON = 0x01
LATITUDE = 0x02
LONGITUDE = 0x03
HEIGHT = 0x04
PORT = 0x05
IFAC_NETNAME = 0x06
IFAC_NETKEY = 0x07
FREQUENCY = 0x08
BANDWIDTH = 0x09
SPREADINGFACTOR = 0x0A
CODINGRATE = 0x0B
MODULATION = 0x0C
APP_NAME = "rnstransport"
class InterfaceAnnouncer():
JOB_INTERVAL = 60
DEFAULT_STAMP_VALUE = 20
WORKBLOCK_EXPAND_ROUNDS = 20
DISCOVERABLE_INTERFACE_TYPES = ["BackboneInterface", "TCPServerInterface", "TCPClientInterface", "RNodeInterface", "I2PInterface", "KISSInterface"]
def __init__(self, owner):
import importlib.util
if importlib.util.find_spec('LXMF') != None: from LXMF import LXStamper
else:
RNS.log("Using on-network interface discovery requires the LXMF module to be installed.", RNS.LOG_CRITICAL)
RNS.log("You can install it with the command: pip install lxmf", RNS.LOG_CRITICAL)
RNS.panic()
self.owner = owner
self.should_run = False
self.job_interval = self.JOB_INTERVAL
self.stamper = LXStamper
self.stamp_cache = {}
self.discovery_destination = RNS.Destination(self.owner.identity, RNS.Destination.IN, RNS.Destination.SINGLE,
APP_NAME, "discovery", "interface")
def start(self):
if not self.should_run:
self.should_run = True
threading.Thread(target=self.job, daemon=True).start()
def stop(self): self.should_run = False
def job(self):
while self.should_run:
time.sleep(self.job_interval)
try:
now = time.time()
due_interfaces = [i for i in self.owner.interfaces if i.supports_discovery and i.discoverable and now > (i.last_discovery_announce+i.discovery_announce_interval)]
due_interfaces.sort(key=lambda i: now-i.last_discovery_announce, reverse=True)
if len(due_interfaces) > 0:
selected_interface = due_interfaces[0]
selected_interface.last_discovery_announce = time.time()
RNS.log(f"Preparing interface discovery announce for {selected_interface.name}", RNS.LOG_VERBOSE)
app_data = self.get_interface_announce_data(selected_interface)
if not app_data: RNS.log(f"Could not generate interface discovery announce data for {selected_interface.name}", RNS.LOG_ERROR)
else:
RNS.log(f"Sending interface discovery announce for {selected_interface.name}", RNS.LOG_VERBOSE)
self.discovery_destination.announce(app_data=app_data)
except Exception as e:
RNS.log(f"Error while preparing interface discovery announces: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def get_interface_announce_data(self, interface):
interface_type = type(interface).__name__
stamp_value = interface.discovery_stamp_value if interface.discovery_stamp_value else self.DEFAULT_STAMP_VALUE
if not interface_type in self.DISCOVERABLE_INTERFACE_TYPES: return None
else:
info = {INTERFACE_TYPE: interface_type,
NAME: interface.discovery_name,
LATITUDE: interface.discovery_latitude,
LONGITUDE: interface.discovery_longitude,
HEIGHT: interface.discovery_height}
if interface_type in ["BackboneInterface", "TCPServerInterface"]:
info[REACHABLE_ON] = interface.reachable_on
info[PORT] = interface.bind_port
if interface_type == "I2PInterface" and interface.connectable and interface.b32:
info[REACHABLE_ON] = interface.b32
if interface_type == "RNodeInterface":
info[FREQUENCY] = interface.frequency
info[BANDWIDTH] = interface.bandwidth
info[SPREADINGFACTOR] = interface.sf
info[CODINGRATE] = interface.cr
if interface_type == "KISSInterface" or (interface_type == "TCPClientInterface" and interface.kiss_framing):
info[INTERFACE_TYPE] = "KISSInterface"
info[FREQUENCY] = interface.discovery_frequency
info[BANDWIDTH] = interface.discovery_bandwidth
info[MODULATION] = interface.discovery_modulation
if interface.discovery_publish_ifac == True:
info[IFAC_NETNAME] = interface.ifac_netname
info[IFAC_NETKEY] = interface.ifac_netkey
packed = msgpack.packb(info)
infohash = RNS.Identity.full_hash(packed)
if infohash in self.stamp_cache: return packed+self.stamp_cache[infohash]
else: stamp, v = self.stamper.generate_stamp(infohash, stamp_cost=stamp_value, expand_rounds=self.WORKBLOCK_EXPAND_ROUNDS)
if not stamp: return None
else:
self.stamp_cache[infohash] = stamp
return packed+stamp
class InterfaceAnnounceHandler:
def __init__(self, required_value=InterfaceAnnouncer.DEFAULT_STAMP_VALUE, callback=None):
import importlib.util
if importlib.util.find_spec('LXMF') != None: from LXMF import LXStamper
else:
RNS.log("Using on-network interface discovery requires the LXMF module to be installed.", RNS.LOG_CRITICAL)
RNS.log("You can install it with the command: pip install lxmf", RNS.LOG_CRITICAL)
RNS.panic()
self.aspect_filter = APP_NAME+".discovery.interface"
self.required_value = required_value
self.callback = callback
self.stamper = LXStamper
def received_announce(self, destination_hash, announced_identity, app_data):
try:
if app_data and len(app_data) > self.stamper.STAMP_SIZE:
stamp = app_data[-self.stamper.STAMP_SIZE:]
packed = app_data[:-self.stamper.STAMP_SIZE]
infohash = RNS.Identity.full_hash(packed)
workblock = self.stamper.stamp_workblock(infohash, expand_rounds=InterfaceAnnouncer.WORKBLOCK_EXPAND_ROUNDS)
value = self.stamper.stamp_value(workblock, stamp)
if value < self.required_value: RNS.log(f"Ignored discovered interface with stamp value {value}", RNS.LOG_DEBUG)
else:
info = None
unpacked = msgpack.unpackb(packed)
if INTERFACE_TYPE in unpacked:
interface_type = unpacked[INTERFACE_TYPE]
info = {"type": interface_type,
"name": unpacked[NAME] or f"Discovered {interface_type}",
"received": time.time(),
"identity": RNS.hexrep(announced_identity.hash, delimit=False),
"latitude": unpacked[LATITUDE],
"longitude": unpacked[LONGITUDE],
"height": unpacked[HEIGHT]}
if IFAC_NETNAME in unpacked: info["ifac_netname"] = unpacked[IFAC_NETNAME]
if IFAC_NETKEY in unpacked: info["ifac_netkey"] = unpacked[IFAC_NETKEY]
if interface_type in ["BackboneInterface", "TCPServerInterface"]:
backbone_support = not RNS.vendor.platformutils.is_windows()
info["reachable_on"] = unpacked[REACHABLE_ON]
info["port"] = unpacked[PORT]
connection_interface = "BackboneClientInterface" if backbone_support else "TCPClientInterface"
remote_str = "remote" if backbone_support else "target_host"
cfg_name = info["name"]
cfg_remote = info["reachable_on"]
cfg_port = info["port"]
cfg_identity = info["identity"]
cfg_netname = info["ifac_netname"] if "ifac_netname" in info else None
cfg_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None
cfg_netname_str = f"\n network_name = {cfg_netname}" if cfg_netname else ""
cfg_netkey_str = f"\n passphrase = {cfg_netkey}" if cfg_netkey else ""
cfg_identity_str = f"\n transport_identity = {cfg_identity}"
info["config_entry"] = f"[[{cfg_name}]]\n type = {connection_interface}\n enabled = yes\n {remote_str} = {cfg_remote}\n target_port = {cfg_port}{cfg_identity_str}{cfg_netkey_str}{cfg_netkey_str}"
if interface_type == "I2PInterface":
info["reachable_on"] = unpacked[REACHABLE_ON]
cfg_name = info["name"]
cfg_remote = info["reachable_on"]
cfg_identity = info["identity"]
cfg_netname = info["ifac_netname"] if "ifac_netname" in info else None
cfg_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None
cfg_netname_str = f"\n network_name = {cfg_netname}" if cfg_netname else ""
cfg_netkey_str = f"\n passphrase = {cfg_netkey}" if cfg_netkey else ""
cfg_identity_str = f"\n transport_identity = {cfg_identity}"
info["config_entry"] = f"[[{cfg_name}]]\n type = I2PInterface\n enabled = yes\n peers = {cfg_remote}{cfg_identity_str}{cfg_netkey_str}{cfg_netkey_str}"
if interface_type == "RNodeInterface":
info["frequency"] = unpacked[FREQUENCY]
info["bandwidth"] = unpacked[BANDWIDTH]
info["sf"] = unpacked[SPREADINGFACTOR]
info["cr"] = unpacked[CODINGRATE]
cfg_name = info["name"]
cfg_frequency = info["frequency"]
cfg_bandwidth = info["bandwidth"]
cfg_sf = info["sf"]
cfg_cr = info["cr"]
cfg_identity = info["identity"]
cfg_netname = info["ifac_netname"] if "ifac_netname" in info else None
cfg_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None
cfg_netname_str = f"\n network_name = {cfg_netname}" if cfg_netname else ""
cfg_netkey_str = f"\n passphrase = {cfg_netkey}" if cfg_netkey else ""
cfg_identity_str = f"\n transport_identity = {cfg_identity}"
info["config_entry"] = f"[[{cfg_name}]]\n type = RNodeInterface\n enabled = yes\n port = \n frequency = {cfg_frequency}\n bandwidth = {cfg_bandwidth}\n spreadingfactor = {cfg_sf}\n codingrate = {cfg_cr}\n txpower = {cfg_netkey_str}{cfg_netkey_str}"
if interface_type == "KISSInterface":
info["frequency"] = unpacked[FREQUENCY]
info["bandwidth"] = unpacked[BANDWIDTH]
info["modulation"] = unpacked[MODULATION]
cfg_name = info["name"]
cfg_frequency = info["frequency"]
cfg_bandwidth = info["bandwidth"]
cfg_modulation = info["modulation"]
cfg_identity = info["identity"]
cfg_netname = info["ifac_netname"] if "ifac_netname" in info else None
cfg_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None
cfg_netname_str = f"\n network_name = {cfg_netname}" if cfg_netname else ""
cfg_netkey_str = f"\n passphrase = {cfg_netkey}" if cfg_netkey else ""
cfg_identity_str = f"\n transport_identity = {cfg_identity}"
info["config_entry"] = f"[[{cfg_name}]]\n type = KISSInterface\n enabled = yes\n port = \n # Frequency: {cfg_frequency}\n # Bandwidth: {cfg_bandwidth}\n # Modulation: {cfg_modulation}{cfg_identity_str}{cfg_netkey_str}{cfg_netkey_str}"
RNS.log(f"Discovered interface with stamp value {value}: {info}", RNS.LOG_DEBUG)
if self.callback and callable(self.callback): self.callback(info)
except Exception as e:
RNS.log(f"An error occurred while trying to decode discovered interface. The contained exception was: {e}", RNS.LOG_ERROR)

View file

@ -127,6 +127,7 @@ class BackboneInterface(Interface):
self.detached = False
self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL
self.spawned_interfaces = []
self.supports_discovery = True
if bindport == None:
raise SystemError(f"No TCP port configured for interface \"{name}\"")

View file

@ -880,6 +880,7 @@ class I2PInterface(Interface):
self.ifac_size = ifac_size
self.ifac_netname = ifac_netname
self.ifac_netkey = ifac_netkey
self.supports_discovery = True
self.online = False

View file

@ -84,6 +84,9 @@ class Interface:
self.bitrate = 62500
self.HW_MTU = None
self.supports_discovery = False
self.discoverable = False
self.last_discovery_announce = 0
self.parent_interface = None
self.spawned_interfaces = None
self.tunnel_id = None

View file

@ -296,6 +296,7 @@ class RNodeInterface(Interface):
self.flow_control = flow_control
self.interface_ready = False
self.announce_rate_target = None
self.supports_discovery = True
if force_ble or self.ble_addr != None or self.ble_name != None: self.use_ble = True
if force_tcp or self.tcp_host != None: self.use_tcp = True

View file

@ -131,10 +131,9 @@ class TCPClientInterface(Interface):
self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL
self.bitrate = TCPClientInterface.BITRATE_GUESS
if max_reconnect_tries == None:
self.max_reconnect_tries = TCPClientInterface.RECONNECT_MAX_TRIES
else:
self.max_reconnect_tries = max_reconnect_tries
self.supports_discovery = True
if max_reconnect_tries == None: self.max_reconnect_tries = TCPClientInterface.RECONNECT_MAX_TRIES
else: self.max_reconnect_tries = max_reconnect_tries
if connected_socket != None:
self.receives = True
@ -513,6 +512,7 @@ class TCPServerInterface(Interface):
if port != None:
bindport = port
self.supports_discovery = True
self.HW_MTU = TCPInterface.HW_MTU
self.online = False

View file

@ -222,15 +222,12 @@ class Reticulum:
:param configdir: Full path to a Reticulum configuration directory.
"""
if Reticulum.__instance != None:
raise OSError("Attempt to reinitialise Reticulum, when it was already running")
else:
Reticulum.__instance = self
if Reticulum.__instance != None: raise OSError("Attempt to reinitialise Reticulum, when it was already running")
else: Reticulum.__instance = self
RNS.vendor.platformutils.platform_checks()
if configdir != None:
Reticulum.configdir = configdir
if configdir != None: Reticulum.configdir = configdir
else:
if os.path.isdir("/etc/reticulum") and os.path.isfile("/etc/reticulum/config"):
Reticulum.configdir = "/etc/reticulum"
@ -258,6 +255,7 @@ class Reticulum:
Reticulum.__remote_management_enabled = False
Reticulum.__use_implicit_proof = True
Reticulum.__allow_probes = False
Reticulum.__discovery_enabled = False
Reticulum.panic_on_interface_error = False
@ -346,6 +344,8 @@ class Reticulum:
thread.daemon = True
thread.start()
if Reticulum.__discovery_enabled: RNS.Transport.enable_discovery()
atexit.register(Reticulum.exit_handler)
signal.signal(signal.SIGINT, Reticulum.sigint_handler)
signal.signal(signal.SIGTERM, Reticulum.sigterm_handler)
@ -641,6 +641,38 @@ class Reticulum:
if "announce_cap" in c:
if c.as_float("announce_cap") > 0 and c.as_float("announce_cap") <= 100:
announce_cap = c.as_float("announce_cap")/100.0
discoverable = False
discovery_announce_interval = None
discovery_stamp_value = None
discovery_name = None
reachable_on = None
publish_ifac = False
latitude = None
longitude = None
height = None
discovery_frequency = None
discovery_bandwidth = None
discovery_modulation = None
if "discoverable" in c:
discoverable = c.as_bool("discoverable")
if discoverable:
Reticulum.__discovery_enabled = True
if "announce_interval" in c:
discovery_announce_interval = c.as_int("announce_interval")*60
if discovery_announce_interval < 5: discovery_announce_interval = 5*60
if discovery_announce_interval == None: discovery_announce_interval = 6*60*60
if "discovery_stamp_value" in c: latitude = c.as_int("discovery_stamp_value")
if "discovery_name" in c: discovery_name = c["discovery_name"]
if "reachable_on" in c: reachable_on = c["reachable_on"]
if "publish_ifac" in c: publish_ifac = c.as_bool("publish_ifac")
if "latitude" in c: latitude = c.as_float("latitude")
if "longitude" in c: latitude = c.as_float("longitude")
if "height" in c: height = c.as_float("height")
if "discovery_frequency" in c: discovery_frequency = c.as_int("discovery_frequency")
if "discovery_bandwidth" in c: discovery_bandwidth = c.as_int("discovery_bandwidth")
if "discovery_modulation" in c: discovery_modulation = c.as_int("discovery_modulation")
try:
def interface_post_init(interface):
@ -658,6 +690,19 @@ class Reticulum:
if ifac_size != None: interface.ifac_size = ifac_size
else: interface.ifac_size = interface.DEFAULT_IFAC_SIZE
interface.discoverable = discoverable
interface.discovery_announce_interval = discovery_announce_interval
interface.discovery_publish_ifac = publish_ifac
interface.reachable_on = reachable_on
interface.discovery_name = discovery_name
interface.discovery_stamp_value = discovery_stamp_value
interface.discovery_latitude = latitude
interface.discovery_longitude = longitude
interface.discovery_height = height
interface.discovery_frequency = discovery_frequency
interface.discovery_bandwidth = discovery_bandwidth
interface.discovery_modulation = discovery_modulation
interface.announce_rate_target = announce_rate_target
interface.announce_rate_grace = announce_rate_grace
interface.announce_rate_penalty = announce_rate_penalty

View file

@ -156,6 +156,7 @@ class Transport:
interface_last_jobs = 0.0
interface_jobs_interval = 5.0
inbound_announce_lock = Lock()
interface_announcer = None
traffic_rxb = 0
traffic_txb = 0
@ -349,6 +350,12 @@ class Transport:
try: Transport.interfaces.sort(key=lambda interface: interface.bitrate, reverse=True)
except Exception as e: RNS.log(f"Could not prioritize interfaces according to bitrate. The contained exception was: {e}", RNS.LOG_ERROR)
@staticmethod
def enable_discovery():
if not Transport.interface_announcer:
Transport.interface_announcer = RNS.Discovery.InterfaceAnnouncer(Transport)
Transport.interface_announcer.start()
@staticmethod
def count_traffic_loop():
while True:

View file

@ -44,6 +44,7 @@ from .Link import Link, RequestReceipt
from .Channel import MessageBase
from .Buffer import Buffer, RawChannelReader, RawChannelWriter
from .Transport import Transport
from .Discovery import InterfaceAnnouncer
from .Destination import Destination
from .Packet import Packet
from .Packet import PacketReceipt