mirror of
https://github.com/markqvist/Reticulum.git
synced 2026-04-27 14:20:35 +00:00
Improved ratchet cleaning. Added inbound packet wait during transport core initialization.
This commit is contained in:
parent
31cc9fc7d1
commit
c71f5d8c5e
3 changed files with 26 additions and 7 deletions
|
|
@ -431,33 +431,39 @@ class Identity:
|
|||
def _clean_ratchets():
|
||||
RNS.log("Cleaning ratchets...", RNS.LOG_DEBUG)
|
||||
try:
|
||||
count = 0
|
||||
removed = 0
|
||||
not_known = 0
|
||||
now = time.time()
|
||||
ratchetdir = RNS.Reticulum.storagepath+"/ratchets"
|
||||
if os.path.isdir(ratchetdir):
|
||||
for filename in os.listdir(ratchetdir):
|
||||
count += 1
|
||||
try:
|
||||
expired = False
|
||||
corrupted = False
|
||||
with open(f"{ratchetdir}/{filename}", "rb") as rf:
|
||||
# TODO: Remove individual ratchet file if corrupt
|
||||
try:
|
||||
ratchet_data = umsgpack.unpackb(rf.read())
|
||||
if now > ratchet_data["received"]+Identity.RATCHET_EXPIRY:
|
||||
expired = True
|
||||
if now > ratchet_data["received"]+Identity.RATCHET_EXPIRY: expired = True
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Corrupted ratchet data while reading {ratchetdir}/{filename}, removing file", RNS.LOG_ERROR)
|
||||
corrupted = True
|
||||
|
||||
destination_hash = bytes.fromhex(filename)
|
||||
if not destination_hash in RNS.Identity.known_destinations: unknown = True; not_known += 1
|
||||
else: unknown = False
|
||||
|
||||
if expired or corrupted:
|
||||
os.unlink(f"{ratchetdir}/{filename}")
|
||||
removed += 1
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"An error occurred while cleaning ratchets, in the processing of {ratchetdir}/{filename}.", RNS.LOG_ERROR)
|
||||
RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"An error occurred while cleaning ratchets. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
except Exception as e: RNS.log(f"An error occurred while cleaning ratchets. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
|
||||
@staticmethod
|
||||
def get_ratchet(destination_hash):
|
||||
|
|
|
|||
|
|
@ -325,6 +325,7 @@ class Reticulum:
|
|||
RNS.log(f"Configuration loaded from {self.configpath}", RNS.LOG_VERBOSE)
|
||||
|
||||
RNS.Identity.load_known_destinations()
|
||||
if not self.is_connected_to_shared_instance: RNS.Identity._clean_ratchets()
|
||||
RNS.Transport.start(self)
|
||||
|
||||
if self.use_af_unix:
|
||||
|
|
@ -354,7 +355,6 @@ class Reticulum:
|
|||
|
||||
def __start_jobs(self):
|
||||
if self.jobs_thread == None:
|
||||
RNS.Identity._clean_ratchets()
|
||||
self.jobs_thread = threading.Thread(target=self.__jobs)
|
||||
self.jobs_thread.daemon = True
|
||||
self.jobs_thread.start()
|
||||
|
|
|
|||
|
|
@ -95,6 +95,7 @@ class Transport:
|
|||
MAX_RATE_TIMESTAMPS = 16 # Maximum number of announce timestamps to keep per destination
|
||||
PERSIST_RANDOM_BLOBS = 32 # Maximum number of random blobs per destination to persist to disk
|
||||
MAX_RANDOM_BLOBS = 64 # Maximum number of random blobs per destination to keep in memory
|
||||
READY_WAIT = 60 # Maximum wait time for inbound packets received before transport core was ready
|
||||
|
||||
interfaces = [] # All active interfaces
|
||||
destinations = [] # All active destinations
|
||||
|
|
@ -166,6 +167,7 @@ class Transport:
|
|||
|
||||
pending_local_path_requests = {}
|
||||
|
||||
ready = False
|
||||
start_time = None
|
||||
hashlist_maxsize = 1000000
|
||||
job_interval = 0.250
|
||||
|
|
@ -400,6 +402,7 @@ class Transport:
|
|||
|
||||
# Sort interfaces according to bitrate
|
||||
Transport.prioritize_interfaces()
|
||||
Transport.ready = True
|
||||
|
||||
# Synthesize tunnels for any interfaces wanting it
|
||||
for interface in Transport.interfaces:
|
||||
|
|
@ -910,7 +913,9 @@ class Transport:
|
|||
if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval:
|
||||
Transport.prioritize_interfaces()
|
||||
try:
|
||||
for interface in Transport.interfaces: interface.process_held_announces()
|
||||
for interface in Transport.interfaces:
|
||||
interface.process_held_announces()
|
||||
if interface.phy_keepalive: interface.send_keepalive()
|
||||
Transport.interface_last_jobs = time.time()
|
||||
except Exception as e:
|
||||
RNS.log(f"Error while processing held per-interface announces: {e}", RNS.LOG_WARNING)
|
||||
|
|
@ -1320,6 +1325,14 @@ class Transport:
|
|||
|
||||
@staticmethod
|
||||
def inbound(raw, interface=None):
|
||||
if not Transport.ready:
|
||||
wait_start = time.time()
|
||||
while not Transport.ready:
|
||||
time.sleep(0.25)
|
||||
if time.time() > wait_start + Transport.READY_WAIT:
|
||||
RNS.log("Inbound packet timed out waiting for transport startup, dropping", RNS.LOG_WARNING)
|
||||
return
|
||||
|
||||
# If interface access codes are enabled,
|
||||
# we must authenticate each packet.
|
||||
if len(raw) > 2:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue