From b408699e651c8d17b512b39b00fc807f65a1f5d1 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 21 Apr 2026 13:21:23 +0200 Subject: [PATCH] Periodically clean known destinations data based on local relevance --- RNS/Destination.py | 4 +- RNS/Identity.py | 93 +++++++++++++++++++++++++++++++++------------- RNS/Transport.py | 12 ++++++ 3 files changed, 80 insertions(+), 29 deletions(-) diff --git a/RNS/Destination.py b/RNS/Destination.py index a81021c..8729685 100755 --- a/RNS/Destination.py +++ b/RNS/Destination.py @@ -411,9 +411,7 @@ class Destination: else: if packet.packet_type == RNS.Packet.DATA: if self.callbacks.packet != None: - try: - def job(): self.callbacks.packet(plaintext, packet) - threading.Thread(target=job, daemon=True).start() + try: self.callbacks.packet(plaintext, packet) except Exception as e: RNS.log("Error while executing receive callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) diff --git a/RNS/Identity.py b/RNS/Identity.py index d9fed96..bfadea1 100644 --- a/RNS/Identity.py +++ b/RNS/Identity.py @@ -102,8 +102,14 @@ class Identity: raise TypeError("Can't remember "+RNS.prettyhexrep(destination_hash)+", the public key size of "+str(len(public_key))+" is not valid.", RNS.LOG_ERROR) else: with Identity.known_destinations_lock: - Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data, 0] - + if not destination_hash in Identity.known_destinations: + Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data, 0] + else: + entry = Identity.known_destinations[destination_hash] + entry[0] = time.time() + entry[1] = packet_hash + entry[2] = public_key + entry[3] = app_data @staticmethod def recall(target_hash, from_identity_hash=False, _no_use=False): @@ -165,7 +171,7 @@ class Identity: else: return None @staticmethod - def save_known_destinations(background=False): + def save_known_destinations(background=False, recombine=True): # TODO: Improve the storage method so we don't have to # deserialize and serialize the entire table on every # save, but the only changes. It might be possible to @@ -186,24 +192,25 @@ class Identity: Identity.saving_known_destinations = True save_start = time.time() - storage_known_destinations = {} - if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"): + if recombine: + storage_known_destinations = {} + if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"): + try: + with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file: + storage_known_destinations = umsgpack.load(file) + + except: pass + try: - with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file: - storage_known_destinations = umsgpack.load(file) - - except: pass + for destination_hash in storage_known_destinations: + if not destination_hash in Identity.known_destinations: + with Identity.known_destinations_lock: + Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash] + + except Exception as e: + RNS.log("Skipped recombining known destinations from disk, since an error occurred: "+str(e), RNS.LOG_WARNING) - try: - for destination_hash in storage_known_destinations: - if not destination_hash in Identity.known_destinations: - with Identity.known_destinations_lock: - Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash] - - except Exception as e: - RNS.log("Skipped recombining known destinations from disk, since an error occurred: "+str(e), RNS.LOG_WARNING) - - RNS.log("Saving "+str(len(Identity.known_destinations))+" known destinations to storage...", RNS.LOG_DEBUG) + RNS.log("Saving "+str(len(Identity.known_destinations))+" known destinations to storage...", RNS.LOG_VERBOSE) with open(RNS.Reticulum.storagepath+"/known_destinations","wb") as file: umsgpack.dump(Identity.known_destinations.copy(), file) @@ -211,7 +218,7 @@ class Identity: if save_time < 1: time_str = str(round(save_time*1000,2))+"ms" else: time_str = str(round(save_time,2))+"s" - RNS.log("Saved known destinations to storage in "+time_str, RNS.LOG_DEBUG) + RNS.log("Saved known destinations to storage in "+time_str, RNS.LOG_VERBOSE) except Exception as e: RNS.log("Error while saving known destinations to disk, the contained exception was: "+str(e), RNS.LOG_ERROR) @@ -275,20 +282,54 @@ class Identity: @staticmethod def clean_known_destinations(): - st = time.time() + now = time.time() + st = now total = len(Identity.known_destinations) + stale = [] no_path = 0 retained = 0 never_used = 0 for destination_hash in Identity.known_destinations: try: - if not RNS.Transport.has_path(destination_hash): no_path += 1 - if Identity.known_destinations[destination_hash][4] == 0: never_used += 1 - elif Identity.known_destinations[destination_hash][4] == -1: retained += 1 + if RNS.Transport.has_path(destination_hash): has_path = True + else: + has_path = False + no_path += 1 - except Exception as e: RNS.log(f"Faulty entry for {RNS.prettyhexrep(destination_hash)}") + with Identity.known_destinations_lock: + if destination_hash in Identity.known_destinations: + last_announce = Identity.known_destinations[destination_hash][0] + last_use = 0 + was_used = False + is_retained = False - RNS.log(f"Total destinations: {total}, no path: {no_path}, never used: {never_used}, with path: {total-no_path}, used: {total-never_used}, retained: {retained}. Completed in {RNS.prettyshorttime(time.time()-st)}") + if Identity.known_destinations[destination_hash][4] > 0: + was_used = True + last_use = Identity.known_destinations[destination_hash][4] + + elif Identity.known_destinations[destination_hash][4] == 0: + was_used = False + never_used += 1 + + elif Identity.known_destinations[destination_hash][4] == -1: + is_retained = True + retained += 1 + + if not is_retained and not has_path: + if never_used and now - last_announce > RNS.Transport.UNUSED_DESTINATION_LINGER: stale.append(destination_hash) + elif now - last_use > RNS.Transport.DESTINATION_TIMEOUT*1.25: stale.append(destination_hash) + + except Exception as e: RNS.log(f"Faulty entry for {RNS.prettyhexrep(destination_hash)} while cleaning known destinations: {e}", RNS.LOG_DEBUG) + + removed = 0 + for destination_hash in stale: + with Identity.known_destinations_lock: + if destination_hash in Identity.known_destinations: + Identity.known_destinations.pop(destination_hash) + removed += 1 + + # RNS.log(f"Total destinations: {total}, stale: {len(stale)}, removed: {removed}, no path: {no_path}, never used: {never_used}, with path: {total-no_path}, used: {total-never_used}, retained: {retained}. Completed in {RNS.prettyshorttime(time.time()-st)}", RNS.LOG_WARNING) # TODO: Remove + if not RNS.Transport.owner.is_connected_to_shared_instance: Identity.save_known_destinations(recombine=False) @staticmethod def full_hash(data): diff --git a/RNS/Transport.py b/RNS/Transport.py index b858d8d..e0b3546 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -87,6 +87,7 @@ class Transport: LINK_TIMEOUT = RNS.Link.STALE_TIME * 1.25 REVERSE_TIMEOUT = 8*60 # Reverse table entries are removed after 8 minutes DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week + UNUSED_DESTINATION_LINGER = 6*60 # Linger time for pathless and never used destinations TUNNEL_TIMEOUT = 60*60*8 # Tunnel table entries are removed if unused for eight hours TUNNEL_PATH_TIMEOUT = 60*60*8 # Tunnel path table entries are removed if unused for eight hours MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of @@ -177,6 +178,8 @@ class Transport: pending_prs_check_interval = 30.0 cache_last_cleaned = 0.0 cache_clean_interval = 5*60 + destinations_last_cleaned = 0.0 + known_destinations_interval = 5*60 tables_last_culled = 0.0 tables_cull_interval = 5.0 interface_last_jobs = 0.0 @@ -263,6 +266,9 @@ class Transport: # Defer cleaning packet cache for 60 seconds Transport.cache_last_cleaned = time.time() + 60 + + # Defer cleaning known destinations + Transport.destinations_last_cleaned = time.time() # Defer sending management announces for 15 seconds Transport.last_mgmt_announce = time.time() - Transport.mgmt_announce_interval + 15 @@ -897,6 +903,12 @@ class Transport: def job(): Transport.clean_cache() threading.Thread(target=job, daemon=True).start() + # Clean known destinations + if time.time() > Transport.destinations_last_cleaned+Transport.known_destinations_interval: + Transport.destinations_last_cleaned = time.time() + def job(): RNS.Identity.clean_known_destinations() + threading.Thread(target=job, daemon=True).start() + # Send announces for management destinations if time.time() > Transport.last_mgmt_announce+Transport.mgmt_announce_interval: try: