diff --git a/RNS/Identity.py b/RNS/Identity.py index 15255ca..cee77f9 100644 --- a/RNS/Identity.py +++ b/RNS/Identity.py @@ -431,33 +431,39 @@ class Identity: def _clean_ratchets(): RNS.log("Cleaning ratchets...", RNS.LOG_DEBUG) try: + count = 0 + removed = 0 + not_known = 0 now = time.time() ratchetdir = RNS.Reticulum.storagepath+"/ratchets" if os.path.isdir(ratchetdir): for filename in os.listdir(ratchetdir): + count += 1 try: expired = False corrupted = False with open(f"{ratchetdir}/{filename}", "rb") as rf: - # TODO: Remove individual ratchet file if corrupt try: ratchet_data = umsgpack.unpackb(rf.read()) - if now > ratchet_data["received"]+Identity.RATCHET_EXPIRY: - expired = True + if now > ratchet_data["received"]+Identity.RATCHET_EXPIRY: expired = True except Exception as e: RNS.log(f"Corrupted ratchet data while reading {ratchetdir}/{filename}, removing file", RNS.LOG_ERROR) corrupted = True + destination_hash = bytes.fromhex(filename) + if not destination_hash in RNS.Identity.known_destinations: unknown = True; not_known += 1 + else: unknown = False + if expired or corrupted: os.unlink(f"{ratchetdir}/{filename}") + removed += 1 except Exception as e: RNS.log(f"An error occurred while cleaning ratchets, in the processing of {ratchetdir}/{filename}.", RNS.LOG_ERROR) RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR) - except Exception as e: - RNS.log(f"An error occurred while cleaning ratchets. The contained exception was: {e}", RNS.LOG_ERROR) + except Exception as e: RNS.log(f"An error occurred while cleaning ratchets. The contained exception was: {e}", RNS.LOG_ERROR) @staticmethod def get_ratchet(destination_hash): diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index a38e11e..43d9c0b 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -325,6 +325,7 @@ class Reticulum: RNS.log(f"Configuration loaded from {self.configpath}", RNS.LOG_VERBOSE) RNS.Identity.load_known_destinations() + if not self.is_connected_to_shared_instance: RNS.Identity._clean_ratchets() RNS.Transport.start(self) if self.use_af_unix: @@ -354,7 +355,6 @@ class Reticulum: def __start_jobs(self): if self.jobs_thread == None: - RNS.Identity._clean_ratchets() self.jobs_thread = threading.Thread(target=self.__jobs) self.jobs_thread.daemon = True self.jobs_thread.start() diff --git a/RNS/Transport.py b/RNS/Transport.py index b24d4e1..dce090e 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -95,6 +95,7 @@ class Transport: MAX_RATE_TIMESTAMPS = 16 # Maximum number of announce timestamps to keep per destination PERSIST_RANDOM_BLOBS = 32 # Maximum number of random blobs per destination to persist to disk MAX_RANDOM_BLOBS = 64 # Maximum number of random blobs per destination to keep in memory + READY_WAIT = 60 # Maximum wait time for inbound packets received before transport core was ready interfaces = [] # All active interfaces destinations = [] # All active destinations @@ -166,6 +167,7 @@ class Transport: pending_local_path_requests = {} + ready = False start_time = None hashlist_maxsize = 1000000 job_interval = 0.250 @@ -400,6 +402,7 @@ class Transport: # Sort interfaces according to bitrate Transport.prioritize_interfaces() + Transport.ready = True # Synthesize tunnels for any interfaces wanting it for interface in Transport.interfaces: @@ -910,7 +913,9 @@ class Transport: if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval: Transport.prioritize_interfaces() try: - for interface in Transport.interfaces: interface.process_held_announces() + for interface in Transport.interfaces: + interface.process_held_announces() + if interface.phy_keepalive: interface.send_keepalive() Transport.interface_last_jobs = time.time() except Exception as e: RNS.log(f"Error while processing held per-interface announces: {e}", RNS.LOG_WARNING) @@ -1320,6 +1325,14 @@ class Transport: @staticmethod def inbound(raw, interface=None): + if not Transport.ready: + wait_start = time.time() + while not Transport.ready: + time.sleep(0.25) + if time.time() > wait_start + Transport.READY_WAIT: + RNS.log("Inbound packet timed out waiting for transport startup, dropping", RNS.LOG_WARNING) + return + # If interface access codes are enabled, # we must authenticate each packet. if len(raw) > 2: