diff --git a/RNS/Destination.py b/RNS/Destination.py index 8729685..a81021c 100755 --- a/RNS/Destination.py +++ b/RNS/Destination.py @@ -411,7 +411,9 @@ class Destination: else: if packet.packet_type == RNS.Packet.DATA: if self.callbacks.packet != None: - try: self.callbacks.packet(plaintext, packet) + try: + def job(): self.callbacks.packet(plaintext, packet) + threading.Thread(target=job, daemon=True).start() except Exception as e: RNS.log("Error while executing receive callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) diff --git a/RNS/Transport.py b/RNS/Transport.py index 79cfd6c..4665de8 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -96,6 +96,7 @@ class Transport: interfaces = [] # All active interfaces destinations = [] # All active destinations + destinations_map = {} # Destination hash map of active destinations pending_links = [] # Links that are being established active_links = [] # Links that are active packet_hashlist = set() # A list of packet hashes for duplicate detection @@ -121,7 +122,9 @@ class Transport: discovery_pr_tags = [] # A table for keeping track of tagged path requests max_pr_tags = 32000 # Maximum amount of unique path request tags to remember + interfaces_lock = Lock() destinations_lock = Lock() + destinations_map_lock = Lock() inbound_announce_lock = Lock() announce_table_lock = Lock() announce_rate_table_lock = Lock() @@ -305,7 +308,8 @@ class Transport: # over an interface. It is cached with it's non- # increased hop-count. announce_packet.hops += 1 - Transport.path_table[destination_hash] = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet.packet_hash] + with Transport.path_table_lock: + Transport.path_table[destination_hash] = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet.packet_hash] RNS.log("Loaded path table entry for "+RNS.prettyhexrep(destination_hash)+" from storage", RNS.LOG_DEBUG) else: RNS.log("Could not reconstruct path table entry from storage for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG) @@ -410,8 +414,9 @@ class Transport: @staticmethod def prioritize_interfaces(): - try: Transport.interfaces.sort(key=lambda interface: interface.bitrate, reverse=True) - except Exception as e: RNS.log(f"Could not prioritize interfaces according to bitrate. The contained exception was: {e}", RNS.LOG_ERROR) + with Transport.interfaces_lock: + try: Transport.interfaces.sort(key=lambda interface: interface.bitrate, reverse=True) + except Exception as e: RNS.log(f"Could not prioritize interfaces according to bitrate. The contained exception was: {e}", RNS.LOG_ERROR) @staticmethod def enable_discovery(): @@ -610,10 +615,14 @@ class Transport: # Cull invalidated path requests if time.time() > Transport.pending_prs_last_checked+Transport.pending_prs_check_interval: + stale_local_prs = [] with Transport.pending_local_prs_lock: for destination_hash in Transport.pending_local_path_requests: if not Transport.pending_local_path_requests[destination_hash] in Transport.interfaces: - Transport.pending_local_path_requests.pop(destination_hash) + stale_local_prs.append(destination_hash) + + for destination_hash in stale_local_prs: + Transport.pending_local_path_requests.pop(destination_hash) Transport.pending_prs_last_checked = time.time() @@ -1091,8 +1100,10 @@ class Transport: should_transmit = False elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: - with Transport.destinations_lock: - local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) + local_destination = None + with Transport.destinations_map_lock: + if packet.destination_hash in Transport.destinations_map: + local_destination = Transport.destinations_map[packet.destination_hash] if local_destination != None: # RNS.log("Allowing announce broadcast on roaming-mode interface from instance-local destination", RNS.LOG_EXTREME) @@ -1114,8 +1125,11 @@ class Transport: should_transmit = False elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: - with Transport.destinations_lock: - local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) + local_destination = None + with Transport.destinations_map_lock: + if packet.destination_hash in Transport.destinations_map: + local_destination = Transport.destinations_map[packet.destination_hash] + if local_destination != None: # RNS.log("Allowing announce broadcast on boundary-mode interface from instance-local destination", RNS.LOG_EXTREME) pass @@ -1584,8 +1598,10 @@ class Transport: interface.hold_announce(packet) return - with Transport.destinations_lock: - local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) + local_destination = None + with Transport.destinations_map_lock: + if packet.destination_hash in Transport.destinations_map: + local_destination = Transport.destinations_map[packet.destination_hash] if local_destination == None and RNS.Identity.validate_announce(packet): if packet.transport_id != None: @@ -1622,7 +1638,9 @@ class Transport: # First, check that the announce is not for a destination # local to this system, and that hops are less than the max - with Transport.destinations_lock: local_and_hops_condition = (not any(packet.destination_hash == d.hash for d in Transport.destinations) and packet.hops < Transport.PATHFINDER_M+1) + with Transport.destinations_map_lock: + local_and_hops_condition = (packet.hops < Transport.PATHFINDER_M+1) and (not packet.destination_hash in Transport.destinations_map) + if local_and_hops_condition: announce_emitted = Transport.announce_emitted(packet) @@ -1960,31 +1978,35 @@ class Transport: # Handling for link requests to local destinations elif packet.packet_type == RNS.Packet.LINKREQUEST: if packet.transport_id == None or packet.transport_id == Transport.identity.hash: - for destination in Transport.destinations: - if destination.hash == packet.destination_hash and destination.type == packet.destination_type: - path_mtu = RNS.Link.mtu_from_lr_packet(packet) - mode = RNS.Link.mode_from_lr_packet(packet) - if packet.receiving_interface.AUTOCONFIGURE_MTU or packet.receiving_interface.FIXED_MTU: - nh_mtu = packet.receiving_interface.HW_MTU + destination = None + with Transport.destinations_map_lock: + if packet.destination_hash in Transport.destinations_map: + destination = Transport.destinations_map[packet.destination_hash] + + if destination and destination.type == packet.destination_type: + path_mtu = RNS.Link.mtu_from_lr_packet(packet) + mode = RNS.Link.mode_from_lr_packet(packet) + if packet.receiving_interface.AUTOCONFIGURE_MTU or packet.receiving_interface.FIXED_MTU: + nh_mtu = packet.receiving_interface.HW_MTU + else: + nh_mtu = RNS.Reticulum.MTU + + if path_mtu: + if packet.receiving_interface.HW_MTU == None: + path_mtu = None + packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE] else: - nh_mtu = RNS.Reticulum.MTU + if nh_mtu < path_mtu: + try: + path_mtu = nh_mtu + clamped_mtu = RNS.Link.signalling_bytes(path_mtu, mode) + packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE]+clamped_mtu + except Exception as e: + RNS.log(f"Dropping link request packet to local destination. The contained exception was: {e}", RNS.LOG_WARNING) + return - if path_mtu: - if packet.receiving_interface.HW_MTU == None: - path_mtu = None - packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE] - else: - if nh_mtu < path_mtu: - try: - path_mtu = nh_mtu - clamped_mtu = RNS.Link.signalling_bytes(path_mtu, mode) - packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE]+clamped_mtu - except Exception as e: - RNS.log(f"Dropping link request packet to local destination. The contained exception was: {e}", RNS.LOG_WARNING) - return - - packet.destination = destination - destination.receive(packet) + packet.destination = destination + destination.receive(packet) # Handling for local data packets elif packet.packet_type == RNS.Packet.DATA: @@ -2015,19 +2037,22 @@ class Transport: while packet.packet_hash in Transport.packet_hashlist: Transport.packet_hashlist.remove(packet.packet_hash) else: - for destination in Transport.destinations: - if destination.hash == packet.destination_hash and destination.type == packet.destination_type: - packet.destination = destination - if destination.receive(packet): - if destination.proof_strategy == RNS.Destination.PROVE_ALL: packet.prove() + destination = None + with Transport.destinations_map_lock: + if packet.destination_hash in Transport.destinations_map: + destination = Transport.destinations_map[packet.destination_hash] - elif destination.proof_strategy == RNS.Destination.PROVE_APP: - if destination.callbacks.proof_requested: - try: - if destination.callbacks.proof_requested(packet): packet.prove() - except Exception as e: - RNS.log("Error while executing proof request callback. The contained exception was: "+str(e), RNS.LOG_ERROR) - break + if destination and destination.type == packet.destination_type: + packet.destination = destination + if destination.receive(packet): + if destination.proof_strategy == RNS.Destination.PROVE_ALL: packet.prove() + + elif destination.proof_strategy == RNS.Destination.PROVE_APP: + if destination.callbacks.proof_requested: + try: + if destination.callbacks.proof_requested(packet): packet.prove() + except Exception as e: + RNS.log("Error while executing proof request callback. The contained exception was: "+str(e), RNS.LOG_ERROR) # Handling for proofs and link-request proofs elif packet.packet_type == RNS.Packet.PROOF: @@ -2254,6 +2279,27 @@ class Transport: RNS.log("Removing path to "+RNS.prettyhexrep(deprecated_path)+" from tunnel "+RNS.prettyhexrep(tunnel_id), RNS.LOG_DEBUG) with Transport.tunnels_lock: paths.pop(deprecated_path) + @staticmethod + def clean_destinations_map(): + with Transport.destinations_lock: + for destination in Transport.destinations: + with Transport.destinations_map_lock: + if not destination.hash in Transport.destinations_map: + Transport.destinations_map[destination.hash] = destination + + with Transport.destinations_map_lock: + stale_destination_hashes = [] + for destination_hash in Transport.destinations_map: + with Transport.destinations_lock: + found = False + for destination in Transport.destinations: + if destination.hash == destination_hash: found = True + + if not found: stale_destination_hashes.append(destination_hash) + + for destination_hash in stale_destination_hashes: + Transport.destinations_map.pop(destination_hash) + @staticmethod def register_destination(destination): destination.MTU = RNS.Reticulum.MTU @@ -2265,6 +2311,9 @@ class Transport: Transport.destinations.append(destination) + with Transport.destinations_map_lock: + Transport.destinations_map[destination.hash] = destination + if Transport.owner.is_connected_to_shared_instance: if destination.type == RNS.Destination.SINGLE: def job(): @@ -2277,6 +2326,10 @@ class Transport: with Transport.destinations_lock: if destination in Transport.destinations: Transport.destinations.remove(destination) + with Transport.destinations_map_lock: + if destination.hash in Transport.destinations_map: + Transport.destinations_map.pop(destination.hash) + @staticmethod def register_link(link): RNS.log("Registering link "+str(link), RNS.LOG_EXTREME) @@ -2761,8 +2814,11 @@ class Transport: destination_exists_on_local_client = True with Transport.pending_local_prs_lock: Transport.pending_local_path_requests[destination_hash] = attached_interface - - local_destination = next((d for d in Transport.destinations if d.hash == destination_hash), None) + + local_destination = None + with Transport.destinations_map_lock: + if destination_hash in Transport.destinations_map: local_destination = Transport.destinations_map[destination_hash] + if local_destination != None: local_destination.announce(path_response=True, tag=tag, attached_interface=attached_interface) RNS.log("Answering path request for "+RNS.prettyhexrep(destination_hash)+interface_str+", destination is local to this system", RNS.LOG_DEBUG) diff --git a/docs/Reticulum Manual.epub b/docs/Reticulum Manual.epub index 7172db6..2dc91eb 100644 Binary files a/docs/Reticulum Manual.epub and b/docs/Reticulum Manual.epub differ diff --git a/docs/Reticulum Manual.pdf b/docs/Reticulum Manual.pdf index a850ae4..9a4b303 100644 Binary files a/docs/Reticulum Manual.pdf and b/docs/Reticulum Manual.pdf differ