Improved transport tunnel handling. Improved memory consumption. Fixed disk I/O bound thread execution time starvation on cache management jobs.

This commit is contained in:
Mark Qvist 2026-04-17 00:07:07 +02:00
commit c6778e4e29
4 changed files with 93 additions and 46 deletions

View file

@ -160,7 +160,7 @@ class Identity:
return None
@staticmethod
def save_known_destinations():
def save_known_destinations(background=False):
# TODO: Improve the storage method so we don't have to
# deserialize and serialize the entire table on every
# save, but the only changes. It might be possible to
@ -491,9 +491,9 @@ class Identity:
return False
@staticmethod
def persist_data():
def persist_data(background=False):
if not RNS.Transport.owner.is_connected_to_shared_instance:
Identity.save_known_destinations()
Identity.save_known_destinations(background=background)
@staticmethod
def exit_handler():

View file

@ -293,7 +293,7 @@ class Packet:
if RNS.Transport.outbound(self): return self.receipt
else:
RNS.log("No interfaces could process the outbound packet", RNS.LOG_WARNING)
RNS.log("No interfaces could process the outbound packet", RNS.LOG_DEBUG)
self.sent = False
self.receipt = None
return False

View file

@ -361,11 +361,11 @@ class Reticulum:
now = time.time()
if now > self.last_cache_clean+Reticulum.CLEAN_INTERVAL:
self.__clean_caches()
self.__clean_caches(background=True)
self.last_cache_clean = time.time()
if now > self.last_data_persist+Reticulum.PERSIST_INTERVAL:
self.__persist_data()
self.__persist_data(background=True)
time.sleep(Reticulum.JOB_INTERVAL)
@ -993,16 +993,16 @@ class Reticulum:
RNS.Transport.interfaces.append(interface)
interface.final_init()
def _should_persist_data(self):
def _should_persist_data(self, background=False):
if time.time() > self.last_data_persist+Reticulum.GRACIOUS_PERSIST_INTERVAL:
self.__persist_data()
self.__persist_data(background=background)
def __persist_data(self):
RNS.Transport.persist_data()
RNS.Identity.persist_data()
def __persist_data(self, background=False):
RNS.Transport.persist_data(background=background)
RNS.Identity.persist_data(background=background)
self.last_data_persist = time.time()
def __clean_caches(self):
def __clean_caches(self, background=False):
RNS.log("Cleaning resource and packet caches...", RNS.LOG_EXTREME)
now = time.time()
@ -1013,8 +1013,8 @@ class Reticulum:
filepath = self.resourcepath + "/" + filename
mtime = os.path.getmtime(filepath)
age = now - mtime
if age > Reticulum.RESOURCE_CACHE:
os.unlink(filepath)
if age > Reticulum.RESOURCE_CACHE: os.unlink(filepath)
if background: time.sleep(0.001)
except Exception as e:
RNS.log("Error while cleaning resources cache, the contained exception was: "+str(e), RNS.LOG_ERROR)
@ -1026,8 +1026,8 @@ class Reticulum:
filepath = self.cachepath + "/" + filename
mtime = os.path.getmtime(filepath)
age = now - mtime
if age > RNS.Transport.DESTINATION_TIMEOUT:
os.unlink(filepath)
if age > RNS.Transport.DESTINATION_TIMEOUT: os.unlink(filepath)
if background: time.sleep(0.001)
except Exception as e:
RNS.log("Error while cleaning resources cache, the contained exception was: "+str(e), RNS.LOG_ERROR)

View file

@ -87,6 +87,8 @@ class Transport:
LINK_TIMEOUT = RNS.Link.STALE_TIME * 1.25
REVERSE_TIMEOUT = 8*60 # Reverse table entries are removed after 8 minutes
DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week
TUNNEL_TIMEOUT = 60*60*8 # Tunnel table entries are removed if unused for eight hours
TUNNEL_PATH_TIMEOUT = 60*60*8 # Tunnel path table entries are removed if unused for eight hours
MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of
MAX_RATE_TIMESTAMPS = 16 # Maximum number of announce timestamps to keep per destination
PERSIST_RANDOM_BLOBS = 32 # Maximum number of random blobs per destination to persist to disk
@ -137,6 +139,7 @@ class Transport:
pending_local_prs_lock = Lock()
path_states_lock = Lock()
jobs_lock = Lock()
cache_clean_lock = Lock()
# Transport control destinations are used
# for control purposes like path requests
@ -362,8 +365,8 @@ class Transport:
tunnel = [tunnel_id, None, tunnel_paths, expires]
with Transport.tunnels_lock: Transport.tunnels[tunnel_id] = tunnel
if len(Transport.path_table) == 1: specifier = "entry"
else: specifier = "entries"
if len(Transport.tunnels) == 1: specifier = "entry"
else: specifier = "entries"
RNS.log("Loaded "+str(len(Transport.tunnels))+" tunnel table "+specifier+" from storage", RNS.LOG_VERBOSE)
gc.collect()
@ -762,9 +765,12 @@ class Transport:
tunnel_entry = Transport.tunnels[tunnel_id]
expires = tunnel_entry[IDX_TT_EXPIRES]
if time.time() > expires:
stale_tunnels.append(tunnel_id)
should_collect = True
if expires > time.time() + Transport.TUNNEL_TIMEOUT*2:
stale_tunnels.append(tunnel_id); should_collect = True
RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" with excessive expiry was removed", RNS.LOG_EXTREME)
elif time.time() > expires:
stale_tunnels.append(tunnel_id); should_collect = True
RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME)
else:
@ -777,11 +783,25 @@ class Transport:
for tunnel_path in tunnel_paths:
tunnel_path_entry = tunnel_paths[tunnel_path]
if time.time() > tunnel_path_entry[0] + Transport.DESTINATION_TIMEOUT:
stale_tunnel_paths.append(tunnel_path)
should_collect = True
if time.time() > tunnel_path_entry[0] + Transport.TUNNEL_PATH_TIMEOUT:
stale_tunnel_paths.append(tunnel_path); should_collect = True
RNS.log("Tunnel path to "+RNS.prettyhexrep(tunnel_path)+" timed out and was removed", RNS.LOG_EXTREME)
else:
active_path = None
with Transport.path_table_lock:
if tunnel_path in Transport.path_table: active_path = Transport.path_table[tunnel_path]
if active_path:
random_blobs = tunnel_path_entry[4]
current_random_blobs = active_path[IDX_PT_RANDBLOBS]
current_path_timebase = Transport.timebase_from_random_blobs(current_random_blobs)
tunnel_announce_timebase = Transport.timebase_from_random_blobs(random_blobs)
if current_path_timebase > tunnel_announce_timebase:
stale_tunnel_paths.append(tunnel_path); should_collect = True
RNS.log("Tunnel path to "+RNS.prettyhexrep(tunnel_path)+" was removed due to more recent active path", RNS.LOG_EXTREME)
for tunnel_path in stale_tunnel_paths:
tunnel_paths.pop(tunnel_path)
ti += 1
@ -864,7 +884,9 @@ class Transport:
# Clean packet caches
if time.time() > Transport.cache_last_cleaned+Transport.cache_clean_interval:
Transport.clean_cache()
Transport.cache_last_cleaned = time.time()
def job(): Transport.clean_cache()
threading.Thread(target=job, daemon=True).start()
# Send announces for management destinations
if time.time() > Transport.last_mgmt_announce+Transport.mgmt_announce_interval:
@ -1869,12 +1891,15 @@ class Transport:
# announce to the tunnels table
if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None:
with Transport.tunnels_lock:
tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id]
paths = tunnel_entry[IDX_TT_PATHS]
paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash]
expires = time.time() + Transport.DESTINATION_TIMEOUT
tunnel_entry[IDX_TT_EXPIRES] = expires
RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG)
if not packet.receiving_interface.tunnel_id in Transport.tunnels:
RNS.log(f"Tunnel ID for {packet.receiving_interface} was not found in tunnel table", RNS.LOG_WARNING)
else:
tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id]
paths = tunnel_entry[IDX_TT_PATHS]
paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash]
expires = time.time() + Transport.TUNNEL_TIMEOUT
tunnel_entry[IDX_TT_EXPIRES] = expires
RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG)
# Call externally registered callbacks from apps
# wanting to know when an announce arrives
@ -1946,7 +1971,6 @@ class Transport:
if path_mtu:
if packet.receiving_interface.HW_MTU == None:
RNS.log(f"No next-hop HW MTU, disabling link MTU upgrade", RNS.LOG_DEBUG) # TODO: Remove debug
path_mtu = None
packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE]
else:
@ -1954,7 +1978,6 @@ class Transport:
try:
path_mtu = nh_mtu
clamped_mtu = RNS.Link.signalling_bytes(path_mtu, mode)
RNS.log(f"Clamping link MTU to {RNS.prettysize(nh_mtu)}", RNS.LOG_DEBUG) # TODO: Remove debug
packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE]+clamped_mtu
except Exception as e:
RNS.log(f"Dropping link request packet to local destination. The contained exception was: {e}", RNS.LOG_WARNING)
@ -2176,7 +2199,7 @@ class Transport:
@staticmethod
def handle_tunnel(tunnel_id, interface):
expires = time.time() + Transport.DESTINATION_TIMEOUT
expires = time.time() + Transport.TUNNEL_TIMEOUT
if not tunnel_id in Transport.tunnels:
RNS.log("Tunnel endpoint "+RNS.prettyhexrep(tunnel_id)+" established.", RNS.LOG_DEBUG)
paths = {}
@ -2209,7 +2232,12 @@ class Transport:
old_entry = Transport.path_table[destination_hash]
old_hops = old_entry[IDX_PT_HOPS]
old_expires = old_entry[IDX_PT_EXPIRES]
if announce_hops <= old_hops or time.time() > old_expires: should_add = True
if announce_hops <= old_hops or time.time() > old_expires:
current_random_blobs = Transport.path_table[destination_hash][IDX_PT_RANDBLOBS]
current_path_timebase = Transport.timebase_from_random_blobs(current_random_blobs)
tunnel_announce_timebase = Transport.timebase_from_random_blobs(random_blobs)
if tunnel_announce_timebase >= current_path_timebase: should_add = True
else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because existing path is more recent", RNS.LOG_DEBUG)
else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because a newer path with fewer hops exist", RNS.LOG_DEBUG)
else:
@ -2317,8 +2345,22 @@ class Transport:
@staticmethod
def clean_cache():
if not Transport.owner.is_connected_to_shared_instance:
Transport.clean_announce_cache()
Transport.cache_last_cleaned = time.time()
if Transport.cache_clean_lock.locked():
RNS.log(f"Cache clean job still running, postponing until next scheduler interval", RNS.LOG_VERBOSE)
else:
try:
acquired_lock = Transport.cache_clean_lock.acquire(blocking=False)
if acquired_lock:
Transport.clean_announce_cache()
Transport.cache_last_cleaned = time.time()
except Exception as e:
RNS.log(f"An error occurred while launching the cache clean job. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
finally:
if acquired_lock: Transport.cache_clean_lock.release()
@staticmethod
def clean_announce_cache():
@ -2326,7 +2368,7 @@ class Transport:
target_path = os.path.join(RNS.Reticulum.cachepath, "announces")
with Transport.path_table_lock: active_paths = [Transport.path_table[dst_hash][6] for dst_hash in Transport.path_table]
with Transport.tunnels_lock: tunnel_paths = list(set([path_dict[dst_hash][6] for path_dict in [Transport.tunnels[tunnel_id][2] for tunnel_id in Transport.tunnels] for dst_hash in path_dict]))
removed = 0
removed = 0; total = 0
for packet_hash in os.listdir(target_path):
remove = False
full_path = os.path.join(target_path, packet_hash)
@ -2335,6 +2377,10 @@ class Transport:
except: remove = True
if (not target_hash in active_paths) and (not target_hash in tunnel_paths): remove = True
if remove: os.unlink(full_path); removed += 1
total += 1
# Low priority, yield thread
time.sleep(0.001)
if removed > 0: RNS.log(f"Removed {removed} cached announces in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG)
@ -2953,7 +2999,7 @@ class Transport:
return announce_emitted
@staticmethod
def save_packet_hashlist():
def save_packet_hashlist(background=False):
if not Transport.owner.is_connected_to_shared_instance:
if hasattr(Transport, "saving_packet_hashlist"):
wait_interval = 0.2
@ -2990,7 +3036,7 @@ class Transport:
@staticmethod
def save_path_table():
def save_path_table(background=False):
if not Transport.owner.is_connected_to_shared_instance:
if hasattr(Transport, "saving_path_table"):
wait_interval = 0.2
@ -3064,7 +3110,7 @@ class Transport:
@staticmethod
def save_tunnel_table():
def save_tunnel_table(background=False):
if not Transport.owner.is_connected_to_shared_instance:
if hasattr(Transport, "saving_tunnel_table"):
wait_interval = 0.2
@ -3138,10 +3184,10 @@ class Transport:
gc.collect()
@staticmethod
def persist_data():
Transport.save_packet_hashlist()
Transport.save_path_table()
Transport.save_tunnel_table()
def persist_data(background=False):
Transport.save_packet_hashlist(background=background)
Transport.save_path_table(background=background)
Transport.save_tunnel_table(background=background)
@staticmethod
def exit_handler():
@ -3232,7 +3278,8 @@ class Transport:
for destination_hash in drop_destinations:
try:
if destination_hash in Transport.path_table: Transport.path_table.pop(destination_hash)
with Transport.path_table_lock:
if destination_hash in Transport.path_table: Transport.path_table.pop(destination_hash)
except Exception as e:
RNS.log(f"Error while dropping blackhole-associated destination from path table: {e}", RNS.LOG_ERROR)