diff --git a/RNS/Destination.py b/RNS/Destination.py index 28251b0..8729685 100755 --- a/RNS/Destination.py +++ b/RNS/Destination.py @@ -295,33 +295,26 @@ class Destination: app_data = returned_app_data signed_data = self.hash+self.identity.get_public_key()+self.name_hash+random_hash+ratchet - if app_data != None: - signed_data += app_data + if app_data != None: signed_data += app_data signature = self.identity.sign(signed_data) announce_data = self.identity.get_public_key()+self.name_hash+random_hash+ratchet+signature - if app_data != None: - announce_data += app_data + if app_data != None: announce_data += app_data self.path_responses[tag] = [time.time(), announce_data] - if path_response: - announce_context = RNS.Packet.PATH_RESPONSE - else: - announce_context = RNS.Packet.NONE + if path_response: announce_context = RNS.Packet.PATH_RESPONSE + else: announce_context = RNS.Packet.NONE - if ratchet: - context_flag = RNS.Packet.FLAG_SET - else: - context_flag = RNS.Packet.FLAG_UNSET + if ratchet: context_flag = RNS.Packet.FLAG_SET + else: context_flag = RNS.Packet.FLAG_UNSET announce_packet = RNS.Packet(self, announce_data, RNS.Packet.ANNOUNCE, context = announce_context, attached_interface = attached_interface, context_flag=context_flag) - if send: - announce_packet.send() - else: - return announce_packet + + if send: announce_packet.send() + else: return announce_packet def accepts_links(self, accepts = None): """ @@ -330,13 +323,10 @@ class Destination: :param accepts: If ``True`` or ``False``, this method sets whether the destination accepts incoming link requests. If not provided or ``None``, the method returns whether the destination currently accepts link requests. :returns: ``True`` or ``False`` depending on whether the destination accepts incoming link requests, if the *accepts* parameter is not provided or ``None``. """ - if accepts == None: - return self.accept_link_requests + if accepts == None: return self.accept_link_requests - if accepts: - self.accept_link_requests = True - else: - self.accept_link_requests = False + if accepts: self.accept_link_requests = True + else: self.accept_link_requests = False def set_link_established_callback(self, callback): """ diff --git a/RNS/Interfaces/Interface.py b/RNS/Interfaces/Interface.py index 87538e7..b8ec9e3 100755 --- a/RNS/Interfaces/Interface.py +++ b/RNS/Interfaces/Interface.py @@ -55,8 +55,8 @@ class Interface: # How many samples to use for announce # frequency calculations - IA_FREQ_SAMPLES = 12 - OA_FREQ_SAMPLES = 12 + IA_FREQ_SAMPLES = 128 + OA_FREQ_SAMPLES = 128 # Maximum amount of ingress limited announces # to hold at any given time. @@ -66,12 +66,12 @@ class Interface: # considered to be newly created. Two # hours by default. IC_NEW_TIME = 2*60*60 - IC_BURST_FREQ_NEW = 3.5 - IC_BURST_FREQ = 12 + IC_BURST_FREQ_NEW = 6 + IC_BURST_FREQ = 35 IC_BURST_HOLD = 1*60 - IC_BURST_PENALTY = 5*60 - IC_HELD_RELEASE_INTERVAL = 30 - IC_DEQUE_MIN_SAMPLE = 8 + IC_BURST_PENALTY = 15 + IC_HELD_RELEASE_INTERVAL = 2 + IC_DEQUE_MIN_SAMPLE = 32 AUTOCONFIGURE_MTU = False FIXED_MTU = False @@ -123,13 +123,14 @@ class Interface: if self.ic_burst_active: if ia_freq < freq_threshold and time.time() > self.ic_burst_activated+self.ic_burst_hold: self.ic_burst_active = False - self.ic_held_release = time.time() + self.ic_burst_penalty + return True else: if ia_freq > freq_threshold: self.ic_burst_active = True self.ic_burst_activated = time.time() + self.ic_held_release = time.time() + self.ic_burst_penalty return True else: return False @@ -174,7 +175,7 @@ class Interface: def process_held_announces(self): try: - if not self.should_ingress_limit() and len(self.held_announces) > 0 and time.time() > self.ic_held_release: + if len(self.held_announces) > 0 and time.time() > self.ic_held_release: freq_threshold = self.ic_burst_freq_new if self.age() < self.ic_new_time else self.ic_burst_freq ia_freq = self.incoming_announce_frequency() if ia_freq < freq_threshold: diff --git a/RNS/Link.py b/RNS/Link.py index 7ae6aae..7835eae 100644 --- a/RNS/Link.py +++ b/RNS/Link.py @@ -722,12 +722,9 @@ class Link: pass def link_closed(self): - for resource in self.incoming_resources: - resource.cancel() - for resource in self.outgoing_resources: - resource.cancel() - if self._channel: - self._channel._shutdown() + for resource in self.incoming_resources: resource.cancel() + for resource in self.outgoing_resources: resource.cancel() + if self._channel: self._channel._shutdown() self.prv = None self.pub = None @@ -741,8 +738,7 @@ class Link: self.destination.links.remove(self) if self.callbacks.link_closed != None: - try: - self.callbacks.link_closed(self) + try: self.callbacks.link_closed(self) except Exception as e: RNS.log("Error while executing link closed callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) diff --git a/RNS/Transport.py b/RNS/Transport.py index e584b9d..0505d29 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -360,7 +360,7 @@ class Transport: if len(tunnel_paths) > 0: tunnel = [tunnel_id, None, tunnel_paths, expires] - Transport.tunnels[tunnel_id] = tunnel + with Transport.tunnels_lock: Transport.tunnels[tunnel_id] = tunnel if len(Transport.path_table) == 1: specifier = "entry" else: specifier = "entries" @@ -908,6 +908,7 @@ class Transport: except Exception as e: RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.trace_exception(e) # TODO: Remove for packet in outgoing: packet.send() @@ -1553,12 +1554,17 @@ class Transport: # potential ingress limiting. Already known # destinations will have re-announces controlled # by normal announce rate limiting. - if interface.should_ingress_limit(): + if packet.destination_hash in Transport.path_requests or packet.destination_hash in Transport.discovery_path_requests: + # RNS.log(f"Skipping ingress limit check for {RNS.prettyhexrep(packet.destination_hash)} due to waiting path requests", RNS.LOG_DEBUG) + pass + + elif interface.should_ingress_limit(): interface.hold_announce(packet) return with Transport.destinations_lock: local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) + if local_destination == None and RNS.Identity.validate_announce(packet): if packet.transport_id != None: received_from = packet.transport_id @@ -1575,14 +1581,15 @@ class Transport: if announce_entry[IDX_AT_RETRIES] > 0: if announce_entry[IDX_AT_LCL_RBRD] >= Transport.LOCAL_REBROADCASTS_MAX: RNS.log("Completed announce processing for "+RNS.prettyhexrep(packet.destination_hash)+", local rebroadcast limit reached", RNS.LOG_EXTREME) - if packet.destination_hash in Transport.announce_table: Transport.announce_table.pop(packet.destination_hash) + with Transport.announce_table_lock: + if packet.destination_hash in Transport.announce_table: Transport.announce_table.pop(packet.destination_hash) if packet.hops-1 == announce_entry[IDX_AT_HOPS]+1 and announce_entry[IDX_AT_RETRIES] > 0: now = time.time() if now < announce_entry[IDX_AT_RTRNS_TMO]: RNS.log("Rebroadcasted announce for "+RNS.prettyhexrep(packet.destination_hash)+" has been passed on to another node, no further tries needed", RNS.LOG_EXTREME) - if packet.destination_hash in Transport.announce_table: - Transport.announce_table.pop(packet.destination_hash) + with Transport.announce_table_lock: + if packet.destination_hash in Transport.announce_table: Transport.announce_table.pop(packet.destination_hash) else: received_from = packet.destination_hash @@ -1706,9 +1713,7 @@ class Transport: else: rate_entry["last"] = now - else: - rate_blocked = True - + else: rate_blocked = True retries = 0 announce_hops = packet.hops @@ -1961,29 +1966,31 @@ class Transport: # Handling for local data packets elif packet.packet_type == RNS.Packet.DATA: if packet.destination_type == RNS.Destination.LINK: - for link in Transport.active_links: - if link.link_id == packet.destination_hash: - if link.attached_interface == packet.receiving_interface: - packet.link = link - if packet.context == RNS.Packet.CACHE_REQUEST: - cached_packet = Transport.get_cached_packet(packet.data) - if cached_packet != None: - cached_packet.unpack() - RNS.Packet(destination=link, data=cached_packet.data, - packet_type=cached_packet.packet_type, context=cached_packet.context).send() + with Transport.active_links_lock: + for link in Transport.active_links: + if link.link_id == packet.destination_hash: + if link.attached_interface == packet.receiving_interface: + packet.link = link + if packet.context == RNS.Packet.CACHE_REQUEST: + cached_packet = Transport.get_cached_packet(packet.data) + if cached_packet != None: + cached_packet.unpack() + RNS.Packet(destination=link, data=cached_packet.data, + packet_type=cached_packet.packet_type, context=cached_packet.context).send() + + else: link.receive(packet) + break - else: link.receive(packet) - - else: - # In the strange and rare case that an interface - # is partly malfunctioning, and a link-associated - # packet is being received on an interface that - # has failed sending, and transport has failed over - # to another path, we remove this packet hash from - # the filter hashlist so the link can receive the - # packet when it finally arrives over another path. - while packet.packet_hash in Transport.packet_hashlist: - Transport.packet_hashlist.remove(packet.packet_hash) + else: + # In the strange and rare case that an interface + # is partly malfunctioning, and a link-associated + # packet is being received on an interface that + # has failed sending, and transport has failed over + # to another path, we remove this packet hash from + # the filter hashlist so the link can receive the + # packet when it finally arrives over another path. + while packet.packet_hash in Transport.packet_hashlist: + Transport.packet_hashlist.remove(packet.packet_hash) else: for destination in Transport.destinations: if destination.hash == packet.destination_hash and destination.type == packet.destination_type: @@ -1997,6 +2004,7 @@ class Transport: if destination.callbacks.proof_requested(packet): packet.prove() except Exception as e: RNS.log("Error while executing proof request callback. The contained exception was: "+str(e), RNS.LOG_ERROR) + break # Handling for proofs and link-request proofs elif packet.packet_type == RNS.Packet.PROOF: @@ -2038,48 +2046,57 @@ class Transport: RNS.log("Link request proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG) else: RNS.log("Received link request proof with hop mismatch, not transporting it", RNS.LOG_DEBUG) + else: # Check if we can deliver it to a local # pending link - for link in Transport.pending_links: - if link.link_id == packet.destination_hash: - # We need to also allow an expected hops value of - # PATHFINDER_M, since in some cases, the number of hops - # to the destination will be unknown at link creation - # time. The real chance of this occuring is likely to be - # extremely small, and this allowance could probably - # be discarded without major issues, but it is kept - # for now to ensure backwards compatibility. - # TODO: Probably reset check back to - # if packet.hops == link.expected_hops: - # within one of the next releases + pending_link = None + with Transport.pending_links_lock: + for link in Transport.pending_links: + if link.link_id == packet.destination_hash: + # We need to also allow an expected hops value of + # PATHFINDER_M, since in some cases, the number of hops + # to the destination will be unknown at link creation + # time. The real chance of this occuring is likely to be + # extremely small, and this allowance could probably + # be discarded without major issues, but it is kept + # for now to ensure backwards compatibility. - if packet.hops == link.expected_hops or link.expected_hops == RNS.Transport.PATHFINDER_M: - # Add this packet to the filter hashlist if we - # have determined that it's actually destined - # for this system, and then validate the proof - Transport.add_packet_hash(packet.packet_hash) - link.validate_proof(packet) + # TODO: Probably reset check back to + # if packet.hops == link.expected_hops: + # within one of the next releases + + if packet.hops == link.expected_hops or link.expected_hops == RNS.Transport.PATHFINDER_M: + # Add this packet to the filter hashlist if we + # have determined that it's actually destined + # for this system, and then validate the proof + Transport.add_packet_hash(packet.packet_hash) + pending_link = link + break + + if pending_link: pending_link.validate_proof(packet) elif packet.context == RNS.Packet.RESOURCE_PRF: - for link in Transport.active_links: - if link.link_id == packet.destination_hash: - link.receive(packet) - else: - if packet.destination_type == RNS.Destination.LINK: + with Transport.active_links_lock: for link in Transport.active_links: if link.link_id == packet.destination_hash: - packet.link = link + link.receive(packet) + break + else: + if packet.destination_type == RNS.Destination.LINK: + with Transport.active_links_lock: + for link in Transport.active_links: + if link.link_id == packet.destination_hash: + packet.link = link + break - if len(packet.data) == RNS.PacketReceipt.EXPL_LENGTH: - proof_hash = packet.data[:RNS.Identity.HASHLENGTH//8] - else: - proof_hash = None + if len(packet.data) == RNS.PacketReceipt.EXPL_LENGTH: proof_hash = packet.data[:RNS.Identity.HASHLENGTH//8] + else: proof_hash = None # Check if this proof needs to be transported if (RNS.Reticulum.transport_enabled() or from_local_client or proof_for_local_client) and packet.destination_hash in Transport.reverse_table: - reverse_entry = Transport.reverse_table.pop(packet.destination_hash) + with Transport.reverse_table_lock: reverse_entry = Transport.reverse_table.pop(packet.destination_hash) if packet.receiving_interface == reverse_entry[IDX_RT_OUTB_IF]: RNS.log("Proof received on correct interface, transporting it via "+str(reverse_entry[IDX_RT_RCVD_IF]), RNS.LOG_EXTREME) new_raw = packet.raw[0:1] @@ -2089,20 +2106,21 @@ class Transport: else: RNS.log("Proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG) - for receipt in Transport.receipts: - receipt_validated = False - if proof_hash != None: - # Only test validation if hash matches - if receipt.hash == proof_hash: + with Transport.receipts_lock: + for receipt in Transport.receipts: + receipt_validated = False + if proof_hash != None: + # Only test validation if hash matches + if receipt.hash == proof_hash: + receipt_validated = receipt.validate_proof_packet(packet) + else: + # In case of an implicit proof, we have + # to check every single outstanding receipt receipt_validated = receipt.validate_proof_packet(packet) - else: - # In case of an implicit proof, we have - # to check every single outstanding receipt - receipt_validated = receipt.validate_proof_packet(packet) - if receipt_validated: - if receipt in Transport.receipts: - Transport.receipts.remove(receipt) + if receipt_validated: + if receipt in Transport.receipts: + Transport.receipts.remove(receipt) @staticmethod def synthesize_tunnel(interface): @@ -2151,9 +2169,10 @@ class Transport: @staticmethod def void_tunnel_interface(tunnel_id): - if tunnel_id in Transport.tunnels: - RNS.log(f"Voiding tunnel interface {Transport.tunnels[tunnel_id][IDX_TT_IF]}", RNS.LOG_EXTREME) - Transport.tunnels[tunnel_id][IDX_TT_IF] = None + with Transport.tunnels_lock: + if tunnel_id in Transport.tunnels: + RNS.log(f"Voiding tunnel interface {Transport.tunnels[tunnel_id][IDX_TT_IF]}", RNS.LOG_EXTREME) + Transport.tunnels[tunnel_id][IDX_TT_IF] = None @staticmethod def handle_tunnel(tunnel_id, interface): @@ -2161,9 +2180,10 @@ class Transport: if not tunnel_id in Transport.tunnels: RNS.log("Tunnel endpoint "+RNS.prettyhexrep(tunnel_id)+" established.", RNS.LOG_DEBUG) paths = {} - tunnel_entry = [tunnel_id, interface, paths, expires] - interface.tunnel_id = tunnel_id - Transport.tunnels[tunnel_id] = tunnel_entry + with Transport.tunnels_lock: + tunnel_entry = [tunnel_id, interface, paths, expires] + interface.tunnel_id = tunnel_id + Transport.tunnels[tunnel_id] = tunnel_entry else: RNS.log("Tunnel endpoint "+RNS.prettyhexrep(tunnel_id)+" reappeared. Restoring paths...", RNS.LOG_DEBUG) tunnel_entry = Transport.tunnels[tunnel_id] @@ -2173,36 +2193,38 @@ class Transport: paths = tunnel_entry[IDX_TT_PATHS] deprecated_paths = [] - for destination_hash, path_entry in paths.items(): - received_from = path_entry[1] - announce_hops = path_entry[2] - expires = path_entry[3] - random_blobs = list(set(path_entry[4])) - receiving_interface = interface - packet_hash = path_entry[6] - new_entry = [time.time(), received_from, announce_hops, expires, random_blobs, receiving_interface, packet_hash] + with Transport.tunnels_lock: + for destination_hash, path_entry in paths.items(): + received_from = path_entry[1] + announce_hops = path_entry[2] + expires = path_entry[3] + random_blobs = list(set(path_entry[4])) + receiving_interface = interface + packet_hash = path_entry[6] + new_entry = [time.time(), received_from, announce_hops, expires, random_blobs, receiving_interface, packet_hash] - should_add = False - if destination_hash in Transport.path_table: - old_entry = Transport.path_table[destination_hash] - old_hops = old_entry[IDX_PT_HOPS] - old_expires = old_entry[IDX_PT_EXPIRES] - if announce_hops <= old_hops or time.time() > old_expires: should_add = True - else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because a newer path with fewer hops exist", RNS.LOG_DEBUG) - - else: - if time.time() < expires: should_add = True - else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because it has expired", RNS.LOG_DEBUG) + should_add = False + with Transport.path_table_lock: + if destination_hash in Transport.path_table: + old_entry = Transport.path_table[destination_hash] + old_hops = old_entry[IDX_PT_HOPS] + old_expires = old_entry[IDX_PT_EXPIRES] + if announce_hops <= old_hops or time.time() > old_expires: should_add = True + else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because a newer path with fewer hops exist", RNS.LOG_DEBUG) + + else: + if time.time() < expires: should_add = True + else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because it has expired", RNS.LOG_DEBUG) - if should_add: - Transport.path_table[destination_hash] = new_entry - RNS.log("Restored path to "+RNS.prettyhexrep(destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(receiving_interface), RNS.LOG_DEBUG) - else: - deprecated_paths.append(destination_hash) + if should_add: + with Transport.path_table_lock: Transport.path_table[destination_hash] = new_entry + RNS.log("Restored path to "+RNS.prettyhexrep(destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(receiving_interface), RNS.LOG_DEBUG) + + else: deprecated_paths.append(destination_hash) for deprecated_path in deprecated_paths: RNS.log("Removing path to "+RNS.prettyhexrep(deprecated_path)+" from tunnel "+RNS.prettyhexrep(tunnel_id), RNS.LOG_DEBUG) - paths.pop(deprecated_path) + with Transport.tunnels_lock: paths.pop(deprecated_path) @staticmethod def register_destination(destination): @@ -2302,8 +2324,8 @@ class Transport: def clean_announce_cache(): st = time.time() target_path = os.path.join(RNS.Reticulum.cachepath, "announces") - active_paths = [Transport.path_table[dst_hash][6] for dst_hash in Transport.path_table] - tunnel_paths = list(set([path_dict[dst_hash][6] for path_dict in [Transport.tunnels[tunnel_id][2] for tunnel_id in Transport.tunnels] for dst_hash in path_dict])) + with Transport.path_table_lock: active_paths = [Transport.path_table[dst_hash][6] for dst_hash in Transport.path_table] + with Transport.tunnels_lock: tunnel_paths = list(set([path_dict[dst_hash][6] for path_dict in [Transport.tunnels[tunnel_id][2] for tunnel_id in Transport.tunnels] for dst_hash in path_dict])) removed = 0 for packet_hash in os.listdir(target_path): remove = False @@ -2314,8 +2336,7 @@ class Transport: if (not target_hash in active_paths) and (not target_hash in tunnel_paths): remove = True if remove: os.unlink(full_path); removed += 1 - if removed > 0: - RNS.log(f"Removed {removed} cached announces in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG) + if removed > 0: RNS.log(f"Removed {removed} cached announces in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG) # When caching packets to storage, they are written # exactly as they arrived over their interface. This @@ -2447,8 +2468,8 @@ class Transport: if next_hop_interface != None: if next_hop_interface.AUTOCONFIGURE_MTU or next_hop_interface.FIXED_MTU: return next_hop_interface.HW_MTU else: return None - else: - return None + + else: return None @staticmethod def next_hop_per_bit_latency(destination_hash): @@ -2511,7 +2532,8 @@ class Transport: def path_is_unresponsive(destination_hash): with Transport.path_states_lock: if destination_hash in Transport.path_states: - if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE: return True + if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE: + return True return False @@ -2757,7 +2779,8 @@ class Transport: held_entry = Transport.announce_table[packet.destination_hash] Transport.held_announces[packet.destination_hash] = held_entry - Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface] + with Transport.announce_table_lock: + Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface] elif is_from_local_client: # Forward path request on all interfaces @@ -2776,7 +2799,7 @@ class Transport: # except the requestor interface RNS.log("Attempting to discover unknown path to "+RNS.prettyhexrep(destination_hash)+" on behalf of path request"+interface_str, RNS.LOG_DEBUG) pr_entry = { "destination_hash": destination_hash, "timeout": time.time()+Transport.PATH_REQUEST_TIMEOUT, "requesting_interface": attached_interface } - Transport.discovery_path_requests[destination_hash] = pr_entry + with Transport.discovery_pr_lock: Transport.discovery_path_requests[destination_hash] = pr_entry for interface in Transport.interfaces: if not interface == attached_interface: @@ -2876,11 +2899,11 @@ class Transport: @staticmethod def shared_connection_disappeared(): - for link in Transport.active_links: - link.teardown() + with Transport.active_links_lock: + for link in Transport.active_links: link.teardown() - for link in Transport.pending_links: - link.teardown() + with Transport.pending_links_lock: + for link in Transport.pending_links: link.teardown() Transport.announce_table = {} Transport.path_table = {} @@ -2892,11 +2915,10 @@ class Transport: @staticmethod def shared_connection_reappeared(): if Transport.owner.is_connected_to_shared_instance: - for registered_destination in Transport.destinations: + for registered_destination in Transport.destinations.copy(): if registered_destination.type == RNS.Destination.SINGLE: registered_destination.announce(path_response=True) - @staticmethod def drop_announce_queues(): for interface in Transport.interfaces: