Keep track of which known destinations are actually in use, so irrelevant destination data can be cleaned

This commit is contained in:
Mark Qvist 2026-04-20 23:48:57 +02:00
commit b5658c4865
6 changed files with 118 additions and 23 deletions

View file

@ -94,17 +94,19 @@ class Identity:
known_ratchets = {}
ratchet_persist_lock = threading.Lock()
known_destinations_lock = threading.Lock()
@staticmethod
def remember(packet_hash, destination_hash, public_key, app_data = None):
if len(public_key) != Identity.KEYSIZE//8:
raise TypeError("Can't remember "+RNS.prettyhexrep(destination_hash)+", the public key size of "+str(len(public_key))+" is not valid.", RNS.LOG_ERROR)
else:
Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data]
with Identity.known_destinations_lock:
Identity.known_destinations[destination_hash] = [time.time(), packet_hash, public_key, app_data, 0]
@staticmethod
def recall(target_hash, from_identity_hash=False):
def recall(target_hash, from_identity_hash=False, _no_use=False):
"""
Recall identity for a destination or identity hash. By default, this function
will return the identity associated with a given *destination* hash. As an
@ -120,6 +122,7 @@ class Identity:
if from_identity_hash:
for destination_hash in Identity.known_destinations:
if target_hash == Identity.truncated_hash(Identity.known_destinations[destination_hash][2]):
if not _no_use: RNS.Reticulum.get_instance()._used_destination_data(destination_hash)
identity_data = Identity.known_destinations[destination_hash]
identity = Identity(create_keys=False)
identity.load_public_key(identity_data[2])
@ -130,6 +133,7 @@ class Identity:
else:
if target_hash in Identity.known_destinations:
if not _no_use: RNS.Reticulum.get_instance()._used_destination_data(target_hash)
identity_data = Identity.known_destinations[target_hash]
identity = Identity(create_keys=False)
identity.load_public_key(identity_data[2])
@ -146,7 +150,7 @@ class Identity:
return None
@staticmethod
def recall_app_data(destination_hash):
def recall_app_data(destination_hash, _no_use=False):
"""
Recall last heard app_data for a destination hash.
@ -154,10 +158,11 @@ class Identity:
:returns: *Bytes* containing app_data, or *None* if the destination is unknown.
"""
if destination_hash in Identity.known_destinations:
if not _no_use: RNS.Reticulum.get_instance()._used_destination_data(destination_hash)
app_data = Identity.known_destinations[destination_hash][3]
return app_data
else:
return None
else: return None
@staticmethod
def save_known_destinations(background=False):
@ -192,7 +197,9 @@ class Identity:
try:
for destination_hash in storage_known_destinations:
if not destination_hash in Identity.known_destinations:
Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash]
with Identity.known_destinations_lock:
Identity.known_destinations[destination_hash] = storage_known_destinations[destination_hash]
except Exception as e:
RNS.log("Skipped recombining known destinations from disk, since an error occurred: "+str(e), RNS.LOG_WARNING)
@ -201,10 +208,8 @@ class Identity:
umsgpack.dump(Identity.known_destinations.copy(), file)
save_time = time.time() - save_start
if save_time < 1:
time_str = str(round(save_time*1000,2))+"ms"
else:
time_str = str(round(save_time,2))+"s"
if save_time < 1: time_str = str(round(save_time*1000,2))+"ms"
else: time_str = str(round(save_time,2))+"s"
RNS.log("Saved known destinations to storage in "+time_str, RNS.LOG_DEBUG)
@ -217,6 +222,7 @@ class Identity:
@staticmethod
def load_known_destinations():
if os.path.isfile(RNS.Reticulum.storagepath+"/known_destinations"):
st = time.time()
try:
with open(RNS.Reticulum.storagepath+"/known_destinations","rb") as file:
loaded_known_destinations = umsgpack.load(file)
@ -224,15 +230,66 @@ class Identity:
Identity.known_destinations = {}
for known_destination in loaded_known_destinations:
if len(known_destination) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
Identity.known_destinations[known_destination] = loaded_known_destinations[known_destination]
if len(loaded_known_destinations[known_destination]) < 5:
e = loaded_known_destinations[known_destination]
loaded_known_destinations[known_destination] = [e[0], e[1], e[2], e[3], 0]
RNS.log("Loaded "+str(len(Identity.known_destinations))+" known destination from storage", RNS.LOG_VERBOSE)
with Identity.known_destinations_lock:
Identity.known_destinations[known_destination] = loaded_known_destinations[known_destination]
RNS.log(f"Loaded {len(Identity.known_destinations)} known destination from storage in {RNS.prettyshorttime(time.time()-st)}", RNS.LOG_VERBOSE)
except Exception as e:
RNS.log("Error loading known destinations from disk, file will be recreated on exit", RNS.LOG_ERROR)
RNS.trace_exception(e)
else:
RNS.log("Destinations file does not exist, no known destinations loaded", RNS.LOG_VERBOSE)
@staticmethod
def _used_destination_data(destination_hash):
with Identity.known_destinations_lock:
if destination_hash in Identity.known_destinations:
if not Identity.known_destinations[destination_hash][4] < 0:
Identity.known_destinations[destination_hash][4] = time.time()
return True
return False
@staticmethod
def _retain_destination_data(destination_hash):
with Identity.known_destinations_lock:
if destination_hash in Identity.known_destinations:
Identity.known_destinations[destination_hash][4] = -1
return True
return False
@staticmethod
def _unretain_destination_data(destination_hash):
with Identity.known_destinations_lock:
if destination_hash in Identity.known_destinations:
Identity.known_destinations[destination_hash][4] = time.time()
return True
return False
@staticmethod
def clean_known_destinations():
st = time.time()
total = len(Identity.known_destinations)
no_path = 0
retained = 0
never_used = 0
for destination_hash in Identity.known_destinations:
try:
if not RNS.Transport.has_path(destination_hash): no_path += 1
if Identity.known_destinations[destination_hash][4] == 0: never_used += 1
elif Identity.known_destinations[destination_hash][4] == -1: retained += 1
except Exception as e: RNS.log(f"Faulty entry for {RNS.prettyhexrep(destination_hash)}")
RNS.log(f"Total destinations: {total}, no path: {no_path}, never used: {never_used}, with path: {total-no_path}, used: {total-never_used}, retained: {retained}. Completed in {RNS.prettyshorttime(time.time()-st)}")
@staticmethod
def full_hash(data):
"""

View file

@ -1087,6 +1087,13 @@ class Reticulum:
identity_hash = call["unblackhole_identity"]
rpc_connection.send(self.unblackhole_identity(identity_hash))
if "destination_data" in call:
operation = call["destination_data"]
destination_hash = call["destination_hash"]
if operation == "used": rpc_connection.send(self._used_destination_data(destination_hash))
elif operation == "retain": rpc_connection.send(self._retain_destination_data(destination_hash))
elif operation == "unretain": rpc_connection.send(self._unretain_destination_data(destination_hash))
rpc_connection.close()
except Exception as e:
@ -1094,6 +1101,33 @@ class Reticulum:
def get_rpc_client(self): return multiprocessing.connection.Client(self.rpc_addr, family=self.rpc_type, authkey=self.rpc_key)
def _used_destination_data(self, destination_hash):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"destination_data": "used", "destination_hash": destination_hash})
response = rpc_connection.recv()
return response
else: return RNS.Identity._used_destination_data(destination_hash)
def _retain_destination_data(self, destination_hash):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"destination_data": "retain", "destination_hash": destination_hash})
response = rpc_connection.recv()
return response
else: return RNS.Identity._retain_destination_data(destination_hash)
def _unretain_destination_data(self, destination_hash):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"destination_data": "unretain", "destination_hash": destination_hash})
response = rpc_connection.recv()
return response
else: return RNS.Identity._unretain_destination_data(destination_hash)
def get_interface_stats(self):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()

View file

@ -297,7 +297,7 @@ class Transport:
blackholed = False
if len(Transport.blackholed_identities) > 0:
path_identity = RNS.Identity.recall(destination_hash)
path_identity = RNS.Identity.recall(destination_hash, _no_use=True)
if path_identity in Transport.blackholed_identities: blackholed = True
del path_identity
@ -566,7 +566,7 @@ class Transport:
announce_context = RNS.Packet.NONE
if block_rebroadcasts: announce_context = RNS.Packet.PATH_RESPONSE
announce_data = packet.data
announce_identity = RNS.Identity.recall(packet.destination_hash)
announce_identity = RNS.Identity.recall(packet.destination_hash, _no_use=True)
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
announce_destination.hash = packet.destination_hash
announce_destination.hexhash = announce_destination.hash.hex()
@ -1824,7 +1824,7 @@ class Transport:
# If we have any local clients connected, we re-
# transmit the announce to them immediately
if (len(Transport.local_client_interfaces)):
announce_identity = RNS.Identity.recall(packet.destination_hash)
announce_identity = RNS.Identity.recall(packet.destination_hash, _no_use=True)
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
announce_destination.hash = packet.destination_hash
announce_destination.hexhash = announce_destination.hash.hex()
@ -1878,7 +1878,7 @@ class Transport:
interface_str = " on "+str(attached_interface)
RNS.log("Got matching announce, answering waiting discovery path request for "+RNS.prettyhexrep(packet.destination_hash)+interface_str, RNS.LOG_DEBUG)
announce_identity = RNS.Identity.recall(packet.destination_hash)
announce_identity = RNS.Identity.recall(packet.destination_hash, _no_use=False)
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
announce_destination.hash = packet.destination_hash
announce_destination.hexhash = announce_destination.hash.hex()
@ -1927,7 +1927,7 @@ class Transport:
# Check that the announced destination matches
# the handlers aspect filter
execute_callback = False
announce_identity = RNS.Identity.recall(packet.destination_hash)
announce_identity = RNS.Identity.recall(packet.destination_hash, _no_use=True)
if handler.aspect_filter == None:
# If the handlers aspect filter is set to
# None, we execute the callback in all cases
@ -1947,14 +1947,14 @@ class Transport:
def job(handler=handler, packet=packet, announce_identity=announce_identity):
handler.received_announce(destination_hash=packet.destination_hash,
announced_identity=announce_identity,
app_data=RNS.Identity.recall_app_data(packet.destination_hash))
app_data=RNS.Identity.recall_app_data(packet.destination_hash, _no_use=True))
threading.Thread(target=job, daemon=True).start()
elif len(inspect.signature(handler.received_announce).parameters) == 4:
def job(handler=handler, packet=packet, announce_identity=announce_identity):
handler.received_announce(destination_hash=packet.destination_hash,
announced_identity=announce_identity,
app_data=RNS.Identity.recall_app_data(packet.destination_hash),
app_data=RNS.Identity.recall_app_data(packet.destination_hash, _no_use=True),
announce_packet_hash = packet.packet_hash)
threading.Thread(target=job, daemon=True).start()
@ -1962,7 +1962,7 @@ class Transport:
def job(handler=handler, packet=packet, announce_identity=announce_identity):
handler.received_announce(destination_hash=packet.destination_hash,
announced_identity=announce_identity,
app_data=RNS.Identity.recall_app_data(packet.destination_hash),
app_data=RNS.Identity.recall_app_data(packet.destination_hash, _no_use=True),
announce_packet_hash = packet.packet_hash,
is_path_response = packet.context == RNS.Packet.PATH_RESPONSE)
threading.Thread(target=job, daemon=True).start()
@ -2070,7 +2070,7 @@ class Transport:
signalling_bytes = RNS.Link.signalling_bytes(RNS.Link.mtu_from_lp_packet(packet), RNS.Link.mode_from_lp_packet(packet))
peer_pub_bytes = packet.data[RNS.Identity.SIGLENGTH//8:RNS.Identity.SIGLENGTH//8+RNS.Link.ECPUBSIZE//2]
peer_identity = RNS.Identity.recall(link_entry[IDX_LT_DSTHASH])
peer_identity = RNS.Identity.recall(link_entry[IDX_LT_DSTHASH], _no_use=True)
peer_sig_pub_bytes = peer_identity.get_public_key()[RNS.Link.ECPUBSIZE//2:RNS.Link.ECPUBSIZE]
signed_data = packet.destination_hash+peer_pub_bytes+peer_sig_pub_bytes+signalling_bytes
@ -2083,6 +2083,8 @@ class Transport:
new_raw += packet.raw[2:]
Transport.link_table[packet.destination_hash][IDX_LT_VALIDATED] = True
Transport.transmit(link_entry[IDX_LT_RCVD_IF], new_raw)
if not Transport.owner.is_connected_to_shared_instance:
RNS.Identity._used_destination_data(link_entry[IDX_LT_DSTHASH])
else:
RNS.log("Invalid link request proof in transport for link "+RNS.prettyhexrep(packet.destination_hash)+", dropping proof.", RNS.LOG_DEBUG)
@ -2884,6 +2886,8 @@ class Transport:
with Transport.announce_table_lock:
Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface]
if not Transport.owner.is_connected_to_shared_instance: RNS.Identity._used_destination_data(packet.destination_hash)
elif is_from_local_client:
# Forward path request on all interfaces
# except the local client
@ -3326,7 +3330,7 @@ class Transport:
drop_destinations = []
for destination_hash in Transport.path_table.copy():
try:
associated_identity = RNS.Identity.recall(destination_hash)
associated_identity = RNS.Identity.recall(destination_hash, _no_use=True)
if associated_identity and associated_identity.hash in Transport.blackholed_identities:
if not destination_hash in drop_destinations: drop_destinations.append(destination_hash)
except Exception as e: