Periodically clean known destinations data based on local relevance

This commit is contained in:
Mark Qvist 2026-04-21 13:21:23 +02:00
commit b408699e65
3 changed files with 80 additions and 29 deletions

View file

@ -411,9 +411,7 @@ class Destination:
else:
if packet.packet_type == RNS.Packet.DATA:
if self.callbacks.packet != None:
try:
def job(): self.callbacks.packet(plaintext, packet)
threading.Thread(target=job, daemon=True).start()
try: self.callbacks.packet(plaintext, packet)
except Exception as e:
RNS.log("Error while executing receive callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)

View file

@ -102,8 +102,14 @@ class Identity:
raise TypeError("Can't remember "+RNS.prettyhexrep(destination_hash)+", the public key size of "+str(len(public_key))+" is not valid.", RNS.LOG_ERROR)
else:
with Identity.known_destinations_lock:
Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data, 0]
if not destination_hash in Identity.known_destinations:
Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data, 0]
else:
entry = Identity.known_destinations[destination_hash]
entry[0] = time.time()
entry[1] = packet_hash
entry[2] = public_key
entry[3] = app_data
@staticmethod
def recall(target_hash, from_identity_hash=False, _no_use=False):
@ -165,7 +171,7 @@ class Identity:
else: return None
@staticmethod
def save_known_destinations(background=False):
def save_known_destinations(background=False, recombine=True):
# TODO: Improve the storage method so we don't have to
# deserialize and serialize the entire table on every
# save, but the only changes. It might be possible to
@ -186,24 +192,25 @@ class Identity:
Identity.saving_known_destinations = True
save_start = time.time()
storage_known_destinations = {}
if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"):
if recombine:
storage_known_destinations = {}
if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"):
try:
with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file:
storage_known_destinations = umsgpack.load(file)
except: pass
try:
with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file:
storage_known_destinations = umsgpack.load(file)
except: pass
for destination_hash in storage_known_destinations:
if not destination_hash in Identity.known_destinations:
with Identity.known_destinations_lock:
Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash]
except Exception as e:
RNS.log("Skipped recombining known destinations from disk, since an error occurred: "+str(e), RNS.LOG_WARNING)
try:
for destination_hash in storage_known_destinations:
if not destination_hash in Identity.known_destinations:
with Identity.known_destinations_lock:
Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash]
except Exception as e:
RNS.log("Skipped recombining known destinations from disk, since an error occurred: "+str(e), RNS.LOG_WARNING)
RNS.log("Saving "+str(len(Identity.known_destinations))+" known destinations to storage...", RNS.LOG_DEBUG)
RNS.log("Saving "+str(len(Identity.known_destinations))+" known destinations to storage...", RNS.LOG_VERBOSE)
with open(RNS.Reticulum.storagepath+"/known_destinations","wb") as file:
umsgpack.dump(Identity.known_destinations.copy(), file)
@ -211,7 +218,7 @@ class Identity:
if save_time < 1: time_str = str(round(save_time*1000,2))+"ms"
else: time_str = str(round(save_time,2))+"s"
RNS.log("Saved known destinations to storage in "+time_str, RNS.LOG_DEBUG)
RNS.log("Saved known destinations to storage in "+time_str, RNS.LOG_VERBOSE)
except Exception as e:
RNS.log("Error while saving known destinations to disk, the contained exception was: "+str(e), RNS.LOG_ERROR)
@ -275,20 +282,54 @@ class Identity:
@staticmethod
def clean_known_destinations():
st = time.time()
now = time.time()
st = now
total = len(Identity.known_destinations)
stale = []
no_path = 0
retained = 0
never_used = 0
for destination_hash in Identity.known_destinations:
try:
if not RNS.Transport.has_path(destination_hash): no_path += 1
if Identity.known_destinations[destination_hash][4] == 0: never_used += 1
elif Identity.known_destinations[destination_hash][4] == -1: retained += 1
if RNS.Transport.has_path(destination_hash): has_path = True
else:
has_path = False
no_path += 1
except Exception as e: RNS.log(f"Faulty entry for {RNS.prettyhexrep(destination_hash)}")
with Identity.known_destinations_lock:
if destination_hash in Identity.known_destinations:
last_announce = Identity.known_destinations[destination_hash][0]
last_use = 0
was_used = False
is_retained = False
RNS.log(f"Total destinations: {total}, no path: {no_path}, never used: {never_used}, with path: {total-no_path}, used: {total-never_used}, retained: {retained}. Completed in {RNS.prettyshorttime(time.time()-st)}")
if Identity.known_destinations[destination_hash][4] > 0:
was_used = True
last_use = Identity.known_destinations[destination_hash][4]
elif Identity.known_destinations[destination_hash][4] == 0:
was_used = False
never_used += 1
elif Identity.known_destinations[destination_hash][4] == -1:
is_retained = True
retained += 1
if not is_retained and not has_path:
if never_used and now - last_announce > RNS.Transport.UNUSED_DESTINATION_LINGER: stale.append(destination_hash)
elif now - last_use > RNS.Transport.DESTINATION_TIMEOUT*1.25: stale.append(destination_hash)
except Exception as e: RNS.log(f"Faulty entry for {RNS.prettyhexrep(destination_hash)} while cleaning known destinations: {e}", RNS.LOG_DEBUG)
removed = 0
for destination_hash in stale:
with Identity.known_destinations_lock:
if destination_hash in Identity.known_destinations:
Identity.known_destinations.pop(destination_hash)
removed += 1
# RNS.log(f"Total destinations: {total}, stale: {len(stale)}, removed: {removed}, no path: {no_path}, never used: {never_used}, with path: {total-no_path}, used: {total-never_used}, retained: {retained}. Completed in {RNS.prettyshorttime(time.time()-st)}", RNS.LOG_WARNING) # TODO: Remove
if not RNS.Transport.owner.is_connected_to_shared_instance: Identity.save_known_destinations(recombine=False)
@staticmethod
def full_hash(data):

View file

@ -87,6 +87,7 @@ class Transport:
LINK_TIMEOUT = RNS.Link.STALE_TIME * 1.25
REVERSE_TIMEOUT = 8*60 # Reverse table entries are removed after 8 minutes
DESTINATION_TIMEOUT = 60*60*24*7 # Destination table entries are removed if unused for one week
UNUSED_DESTINATION_LINGER = 6*60 # Linger time for pathless and never used destinations
TUNNEL_TIMEOUT = 60*60*8 # Tunnel table entries are removed if unused for eight hours
TUNNEL_PATH_TIMEOUT = 60*60*8 # Tunnel path table entries are removed if unused for eight hours
MAX_RECEIPTS = 1024 # Maximum number of receipts to keep track of
@ -177,6 +178,8 @@ class Transport:
pending_prs_check_interval = 30.0
cache_last_cleaned = 0.0
cache_clean_interval = 5*60
destinations_last_cleaned = 0.0
known_destinations_interval = 5*60
tables_last_culled = 0.0
tables_cull_interval = 5.0
interface_last_jobs = 0.0
@ -263,6 +266,9 @@ class Transport:
# Defer cleaning packet cache for 60 seconds
Transport.cache_last_cleaned = time.time() + 60
# Defer cleaning known destinations
Transport.destinations_last_cleaned = time.time()
# Defer sending management announces for 15 seconds
Transport.last_mgmt_announce = time.time() - Transport.mgmt_announce_interval + 15
@ -897,6 +903,12 @@ class Transport:
def job(): Transport.clean_cache()
threading.Thread(target=job, daemon=True).start()
# Clean known destinations
if time.time() > Transport.destinations_last_cleaned+Transport.known_destinations_interval:
Transport.destinations_last_cleaned = time.time()
def job(): RNS.Identity.clean_known_destinations()
threading.Thread(target=job, daemon=True).start()
# Send announces for management destinations
if time.time() > Transport.last_mgmt_announce+Transport.mgmt_announce_interval:
try: