From c6778e4e29f6a2dfd04dd0f526f75f0750e94faf Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 17 Apr 2026 00:07:07 +0200 Subject: [PATCH] Improved transport tunnel handling. Improved memory consumption. Fixed disk I/O bound thread execution time starvation on cache management jobs. --- RNS/Identity.py | 6 +-- RNS/Packet.py | 2 +- RNS/Reticulum.py | 24 +++++------ RNS/Transport.py | 107 ++++++++++++++++++++++++++++++++++------------- 4 files changed, 93 insertions(+), 46 deletions(-) diff --git a/RNS/Identity.py b/RNS/Identity.py index 600ade1..58e4a09 100644 --- a/RNS/Identity.py +++ b/RNS/Identity.py @@ -160,7 +160,7 @@ class Identity: return None @staticmethod - def save_known_destinations(): + def save_known_destinations(background=False): # TODO: Improve the storage method so we don't have to # deserialize and serialize the entire table on every # save, but the only changes. It might be possible to @@ -491,9 +491,9 @@ class Identity: return False @staticmethod - def persist_data(): + def persist_data(background=False): if not RNS.Transport.owner.is_connected_to_shared_instance: - Identity.save_known_destinations() + Identity.save_known_destinations(background=background) @staticmethod def exit_handler(): diff --git a/RNS/Packet.py b/RNS/Packet.py index c153190..5363826 100755 --- a/RNS/Packet.py +++ b/RNS/Packet.py @@ -293,7 +293,7 @@ class Packet: if RNS.Transport.outbound(self): return self.receipt else: - RNS.log("No interfaces could process the outbound packet", RNS.LOG_WARNING) + RNS.log("No interfaces could process the outbound packet", RNS.LOG_DEBUG) self.sent = False self.receipt = None return False diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index 9792b4a..29034bb 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -361,11 +361,11 @@ class Reticulum: now = time.time() if now > self.last_cache_clean+Reticulum.CLEAN_INTERVAL: - self.__clean_caches() + self.__clean_caches(background=True) self.last_cache_clean = time.time() if now > self.last_data_persist+Reticulum.PERSIST_INTERVAL: - self.__persist_data() + self.__persist_data(background=True) time.sleep(Reticulum.JOB_INTERVAL) @@ -993,16 +993,16 @@ class Reticulum: RNS.Transport.interfaces.append(interface) interface.final_init() - def _should_persist_data(self): + def _should_persist_data(self, background=False): if time.time() > self.last_data_persist+Reticulum.GRACIOUS_PERSIST_INTERVAL: - self.__persist_data() + self.__persist_data(background=background) - def __persist_data(self): - RNS.Transport.persist_data() - RNS.Identity.persist_data() + def __persist_data(self, background=False): + RNS.Transport.persist_data(background=background) + RNS.Identity.persist_data(background=background) self.last_data_persist = time.time() - def __clean_caches(self): + def __clean_caches(self, background=False): RNS.log("Cleaning resource and packet caches...", RNS.LOG_EXTREME) now = time.time() @@ -1013,8 +1013,8 @@ class Reticulum: filepath = self.resourcepath + "/" + filename mtime = os.path.getmtime(filepath) age = now - mtime - if age > Reticulum.RESOURCE_CACHE: - os.unlink(filepath) + if age > Reticulum.RESOURCE_CACHE: os.unlink(filepath) + if background: time.sleep(0.001) except Exception as e: RNS.log("Error while cleaning resources cache, the contained exception was: "+str(e), RNS.LOG_ERROR) @@ -1026,8 +1026,8 @@ class Reticulum: filepath = self.cachepath + "/" + filename mtime = os.path.getmtime(filepath) age = now - mtime - if age > RNS.Transport.DESTINATION_TIMEOUT: - os.unlink(filepath) + if age > RNS.Transport.DESTINATION_TIMEOUT: os.unlink(filepath) + if background: time.sleep(0.001) except Exception as e: RNS.log("Error while cleaning resources cache, the contained exception was: "+str(e), RNS.LOG_ERROR) diff --git a/RNS/Transport.py b/RNS/Transport.py index 0505d29..79cfd6c 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -87,6 +87,8 @@ class Transport: LINK_TIMEOUT = RNS.Link.STALE_TIME * 1.25 REVERSE_TIMEOUT = 8*60 # Reverse table entries are removed after 8 minutes DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week + TUNNEL_TIMEOUT = 60*60*8 # Tunnel table entries are removed if unused for eight hours + TUNNEL_PATH_TIMEOUT = 60*60*8 # Tunnel path table entries are removed if unused for eight hours MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of MAX_RATE_TIMESTAMPS = 16 # Maximum number of announce timestamps to keep per destination PERSIST_RANDOM_BLOBS = 32 # Maximum number of random blobs per destination to persist to disk @@ -137,6 +139,7 @@ class Transport: pending_local_prs_lock = Lock() path_states_lock = Lock() jobs_lock = Lock() + cache_clean_lock = Lock() # Transport control destinations are used # for control purposes like path requests @@ -362,8 +365,8 @@ class Transport: tunnel = [tunnel_id, None, tunnel_paths, expires] with Transport.tunnels_lock: Transport.tunnels[tunnel_id] = tunnel - if len(Transport.path_table) == 1: specifier = "entry" - else: specifier = "entries" + if len(Transport.tunnels) == 1: specifier = "entry" + else: specifier = "entries" RNS.log("Loaded "+str(len(Transport.tunnels))+" tunnel table "+specifier+" from storage", RNS.LOG_VERBOSE) gc.collect() @@ -762,9 +765,12 @@ class Transport: tunnel_entry = Transport.tunnels[tunnel_id] expires = tunnel_entry[IDX_TT_EXPIRES] - if time.time() > expires: - stale_tunnels.append(tunnel_id) - should_collect = True + if expires > time.time() + Transport.TUNNEL_TIMEOUT*2: + stale_tunnels.append(tunnel_id); should_collect = True + RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" with excessive expiry was removed", RNS.LOG_EXTREME) + + elif time.time() > expires: + stale_tunnels.append(tunnel_id); should_collect = True RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME) else: @@ -777,11 +783,25 @@ class Transport: for tunnel_path in tunnel_paths: tunnel_path_entry = tunnel_paths[tunnel_path] - if time.time() > tunnel_path_entry[0] + Transport.DESTINATION_TIMEOUT: - stale_tunnel_paths.append(tunnel_path) - should_collect = True + if time.time() > tunnel_path_entry[0] + Transport.TUNNEL_PATH_TIMEOUT: + stale_tunnel_paths.append(tunnel_path); should_collect = True RNS.log("Tunnel path to "+RNS.prettyhexrep(tunnel_path)+" timed out and was removed", RNS.LOG_EXTREME) + else: + active_path = None + with Transport.path_table_lock: + if tunnel_path in Transport.path_table: active_path = Transport.path_table[tunnel_path] + + if active_path: + random_blobs = tunnel_path_entry[4] + current_random_blobs = active_path[IDX_PT_RANDBLOBS] + current_path_timebase = Transport.timebase_from_random_blobs(current_random_blobs) + tunnel_announce_timebase = Transport.timebase_from_random_blobs(random_blobs) + + if current_path_timebase > tunnel_announce_timebase: + stale_tunnel_paths.append(tunnel_path); should_collect = True + RNS.log("Tunnel path to "+RNS.prettyhexrep(tunnel_path)+" was removed due to more recent active path", RNS.LOG_EXTREME) + for tunnel_path in stale_tunnel_paths: tunnel_paths.pop(tunnel_path) ti += 1 @@ -864,7 +884,9 @@ class Transport: # Clean packet caches if time.time() > Transport.cache_last_cleaned+Transport.cache_clean_interval: - Transport.clean_cache() + Transport.cache_last_cleaned = time.time() + def job(): Transport.clean_cache() + threading.Thread(target=job, daemon=True).start() # Send announces for management destinations if time.time() > Transport.last_mgmt_announce+Transport.mgmt_announce_interval: @@ -1869,12 +1891,15 @@ class Transport: # announce to the tunnels table if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None: with Transport.tunnels_lock: - tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id] - paths = tunnel_entry[IDX_TT_PATHS] - paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash] - expires = time.time() + Transport.DESTINATION_TIMEOUT - tunnel_entry[IDX_TT_EXPIRES] = expires - RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG) + if not packet.receiving_interface.tunnel_id in Transport.tunnels: + RNS.log(f"Tunnel ID for {packet.receiving_interface} was not found in tunnel table", RNS.LOG_WARNING) + else: + tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id] + paths = tunnel_entry[IDX_TT_PATHS] + paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash] + expires = time.time() + Transport.TUNNEL_TIMEOUT + tunnel_entry[IDX_TT_EXPIRES] = expires + RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG) # Call externally registered callbacks from apps # wanting to know when an announce arrives @@ -1946,7 +1971,6 @@ class Transport: if path_mtu: if packet.receiving_interface.HW_MTU == None: - RNS.log(f"No next-hop HW MTU, disabling link MTU upgrade", RNS.LOG_DEBUG) # TODO: Remove debug path_mtu = None packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE] else: @@ -1954,7 +1978,6 @@ class Transport: try: path_mtu = nh_mtu clamped_mtu = RNS.Link.signalling_bytes(path_mtu, mode) - RNS.log(f"Clamping link MTU to {RNS.prettysize(nh_mtu)}", RNS.LOG_DEBUG) # TODO: Remove debug packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE]+clamped_mtu except Exception as e: RNS.log(f"Dropping link request packet to local destination. The contained exception was: {e}", RNS.LOG_WARNING) @@ -2176,7 +2199,7 @@ class Transport: @staticmethod def handle_tunnel(tunnel_id, interface): - expires = time.time() + Transport.DESTINATION_TIMEOUT + expires = time.time() + Transport.TUNNEL_TIMEOUT if not tunnel_id in Transport.tunnels: RNS.log("Tunnel endpoint "+RNS.prettyhexrep(tunnel_id)+" established.", RNS.LOG_DEBUG) paths = {} @@ -2209,7 +2232,12 @@ class Transport: old_entry = Transport.path_table[destination_hash] old_hops = old_entry[IDX_PT_HOPS] old_expires = old_entry[IDX_PT_EXPIRES] - if announce_hops <= old_hops or time.time() > old_expires: should_add = True + if announce_hops <= old_hops or time.time() > old_expires: + current_random_blobs = Transport.path_table[destination_hash][IDX_PT_RANDBLOBS] + current_path_timebase = Transport.timebase_from_random_blobs(current_random_blobs) + tunnel_announce_timebase = Transport.timebase_from_random_blobs(random_blobs) + if tunnel_announce_timebase >= current_path_timebase: should_add = True + else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because existing path is more recent", RNS.LOG_DEBUG) else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because a newer path with fewer hops exist", RNS.LOG_DEBUG) else: @@ -2317,8 +2345,22 @@ class Transport: @staticmethod def clean_cache(): if not Transport.owner.is_connected_to_shared_instance: - Transport.clean_announce_cache() - Transport.cache_last_cleaned = time.time() + if Transport.cache_clean_lock.locked(): + RNS.log(f"Cache clean job still running, postponing until next scheduler interval", RNS.LOG_VERBOSE) + + else: + try: + acquired_lock = Transport.cache_clean_lock.acquire(blocking=False) + if acquired_lock: + Transport.clean_announce_cache() + Transport.cache_last_cleaned = time.time() + + except Exception as e: + RNS.log(f"An error occurred while launching the cache clean job. The contained exception was: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + + finally: + if acquired_lock: Transport.cache_clean_lock.release() @staticmethod def clean_announce_cache(): @@ -2326,7 +2368,7 @@ class Transport: target_path = os.path.join(RNS.Reticulum.cachepath, "announces") with Transport.path_table_lock: active_paths = [Transport.path_table[dst_hash][6] for dst_hash in Transport.path_table] with Transport.tunnels_lock: tunnel_paths = list(set([path_dict[dst_hash][6] for path_dict in [Transport.tunnels[tunnel_id][2] for tunnel_id in Transport.tunnels] for dst_hash in path_dict])) - removed = 0 + removed = 0; total = 0 for packet_hash in os.listdir(target_path): remove = False full_path = os.path.join(target_path, packet_hash) @@ -2335,6 +2377,10 @@ class Transport: except: remove = True if (not target_hash in active_paths) and (not target_hash in tunnel_paths): remove = True if remove: os.unlink(full_path); removed += 1 + total += 1 + + # Low priority, yield thread + time.sleep(0.001) if removed > 0: RNS.log(f"Removed {removed} cached announces in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG) @@ -2953,7 +2999,7 @@ class Transport: return announce_emitted @staticmethod - def save_packet_hashlist(): + def save_packet_hashlist(background=False): if not Transport.owner.is_connected_to_shared_instance: if hasattr(Transport, "saving_packet_hashlist"): wait_interval = 0.2 @@ -2990,7 +3036,7 @@ class Transport: @staticmethod - def save_path_table(): + def save_path_table(background=False): if not Transport.owner.is_connected_to_shared_instance: if hasattr(Transport, "saving_path_table"): wait_interval = 0.2 @@ -3064,7 +3110,7 @@ class Transport: @staticmethod - def save_tunnel_table(): + def save_tunnel_table(background=False): if not Transport.owner.is_connected_to_shared_instance: if hasattr(Transport, "saving_tunnel_table"): wait_interval = 0.2 @@ -3138,10 +3184,10 @@ class Transport: gc.collect() @staticmethod - def persist_data(): - Transport.save_packet_hashlist() - Transport.save_path_table() - Transport.save_tunnel_table() + def persist_data(background=False): + Transport.save_packet_hashlist(background=background) + Transport.save_path_table(background=background) + Transport.save_tunnel_table(background=background) @staticmethod def exit_handler(): @@ -3232,7 +3278,8 @@ class Transport: for destination_hash in drop_destinations: try: - if destination_hash in Transport.path_table: Transport.path_table.pop(destination_hash) + with Transport.path_table_lock: + if destination_hash in Transport.path_table: Transport.path_table.pop(destination_hash) except Exception as e: RNS.log(f"Error while dropping blackhole-associated destination from path table: {e}", RNS.LOG_ERROR)