From fa353fb0b315a95a0c402e4cbbed724e2a37ae06 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sun, 12 Apr 2026 13:33:15 +0200 Subject: [PATCH] Refactored transport jobs for free-threaded implementation --- RNS/Transport.py | 651 ++++++++++++++++++++++++----------------------- 1 file changed, 332 insertions(+), 319 deletions(-) diff --git a/RNS/Transport.py b/RNS/Transport.py index 45f2ead..be457cb 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -124,8 +124,14 @@ class Transport: path_table_lock = Lock() reverse_table_lock = Lock() link_table_lock = Lock() + active_links_lock = Lock() + pending_links_lock = Lock() tunnels_lock = Lock() + receipts_lock = Lock() discovery_pr_lock = Lock() + discovery_pr_tags_lock = Lock() + path_requests_lock = Lock() + pending_local_prs_lock = Lock() path_states_lock = Lock() jobs_lock = Lock() @@ -470,118 +476,128 @@ class Transport: Transport.jobs_running = True try: - if not Transport.jobs_locked: + with Transport.jobs_lock: should_collect = False # Process active and pending link lists if time.time() > Transport.links_last_checked+Transport.links_check_interval: - for link in Transport.pending_links: - if link.status == RNS.Link.CLOSED: - # If we are not a Transport Instance, finding a pending link - # that was never activated will trigger an expiry of the path - # to the destination, and an attempt to rediscover the path. - if not RNS.Reticulum.transport_enabled(): - Transport.expire_path(link.destination.hash) + with Transport.pending_links_lock: + for link in Transport.pending_links: + if link.status == RNS.Link.CLOSED: + # If we are not a Transport Instance, finding a pending link + # that was never activated will trigger an expiry of the path + # to the destination, and an attempt to rediscover the path. + if not RNS.Reticulum.transport_enabled(): + Transport.expire_path(link.destination.hash) - # If we are connected to a shared instance, it will take - # care of sending out a new path request. If not, we will - # send one directly. - if not Transport.owner.is_connected_to_shared_instance: - last_path_request = 0 - if link.destination.hash in Transport.path_requests: - last_path_request = Transport.path_requests[link.destination.hash] + # If we are connected to a shared instance, it will take + # care of sending out a new path request. If not, we will + # send one directly. + if not Transport.owner.is_connected_to_shared_instance: + last_path_request = 0 + with Transport.path_requests_lock: + if link.destination.hash in Transport.path_requests: + last_path_request = Transport.path_requests[link.destination.hash] - if time.time() - last_path_request > Transport.PATH_REQUEST_MI: - RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link.destination.hash)+" since an attempted link was never established", RNS.LOG_DEBUG) - if not link.destination.hash in path_requests: - blocked_if = None - path_requests[link.destination.hash] = blocked_if + if time.time() - last_path_request > Transport.PATH_REQUEST_MI: + RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link.destination.hash)+" since an attempted link was never established", RNS.LOG_DEBUG) + if not link.destination.hash in path_requests: + blocked_if = None + path_requests[link.destination.hash] = blocked_if - Transport.pending_links.remove(link) + Transport.pending_links.remove(link) - for link in Transport.active_links: - if link.status == RNS.Link.CLOSED: - Transport.active_links.remove(link) + with Transport.active_links_lock: + closed_links = [] + for link in Transport.active_links: + if link.status == RNS.Link.CLOSED: closed_links.append(link) + + for closed_link in closed_links: Transport.active_links.remove(closed_link) Transport.links_last_checked = time.time() # Process receipts list for timed-out packets if time.time() > Transport.receipts_last_checked+Transport.receipts_check_interval: - while len(Transport.receipts) > Transport.MAX_RECEIPTS: - culled_receipt = Transport.receipts.pop(0) - culled_receipt.timeout = -1 - culled_receipt.check_timeout() - should_collect = True + with Transport.receipts_lock: + while len(Transport.receipts) > Transport.MAX_RECEIPTS: + culled_receipt = Transport.receipts.pop(0) + culled_receipt.timeout = -1 + culled_receipt.check_timeout() + should_collect = True - for receipt in Transport.receipts: - receipt.check_timeout() - if receipt.status != RNS.PacketReceipt.SENT: - if receipt in Transport.receipts: - Transport.receipts.remove(receipt) + with Transport.receipts_lock: + expired_receipts = [] + for receipt in Transport.receipts: + receipt.check_timeout() + if receipt.status != RNS.PacketReceipt.SENT: expired_receipts.append(receipt) + + for expired_receipt in expired_receipts: + if expired_receipt in Transport.receipts: Transport.receipts.remove(expired_receipt) Transport.receipts_last_checked = time.time() # Process announces needing retransmission if time.time() > Transport.announces_last_checked+Transport.announces_check_interval: completed_announces = [] - for destination_hash in Transport.announce_table: - announce_entry = Transport.announce_table[destination_hash] - if announce_entry[IDX_AT_RETRIES] > 0 and announce_entry[IDX_AT_RETRIES] >= Transport.LOCAL_REBROADCASTS_MAX: - RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", local rebroadcast limit reached", RNS.LOG_EXTREME) - completed_announces.append(destination_hash) - elif announce_entry[IDX_AT_RETRIES] > Transport.PATHFINDER_R: - RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", retry limit reached", RNS.LOG_EXTREME) - completed_announces.append(destination_hash) - else: - if time.time() > announce_entry[IDX_AT_RTRNS_TMO]: - announce_entry[IDX_AT_RTRNS_TMO] = time.time() + Transport.PATHFINDER_G + Transport.PATHFINDER_RW - announce_entry[IDX_AT_RETRIES] += 1 - packet = announce_entry[IDX_AT_PACKET] - block_rebroadcasts = announce_entry[IDX_AT_BLCK_RBRD] - attached_interface = announce_entry[IDX_AT_ATTCHD_IF] - announce_context = RNS.Packet.NONE - if block_rebroadcasts: announce_context = RNS.Packet.PATH_RESPONSE - announce_data = packet.data - announce_identity = RNS.Identity.recall(packet.destination_hash) - announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); - announce_destination.hash = packet.destination_hash - announce_destination.hexhash = announce_destination.hash.hex() - - new_packet = RNS.Packet( - announce_destination, - announce_data, - RNS.Packet.ANNOUNCE, - context = announce_context, - header_type = RNS.Packet.HEADER_2, - transport_type = Transport.TRANSPORT, - transport_id = Transport.identity.hash, - attached_interface = attached_interface, - context_flag = packet.context_flag, - ) + + with Transport.announce_table_lock: + for destination_hash in Transport.announce_table: + announce_entry = Transport.announce_table[destination_hash] + if announce_entry[IDX_AT_RETRIES] > 0 and announce_entry[IDX_AT_RETRIES] >= Transport.LOCAL_REBROADCASTS_MAX: + RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", local rebroadcast limit reached", RNS.LOG_EXTREME) + completed_announces.append(destination_hash) + elif announce_entry[IDX_AT_RETRIES] > Transport.PATHFINDER_R: + RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", retry limit reached", RNS.LOG_EXTREME) + completed_announces.append(destination_hash) + else: + if time.time() > announce_entry[IDX_AT_RTRNS_TMO]: + announce_entry[IDX_AT_RTRNS_TMO] = time.time() + Transport.PATHFINDER_G + Transport.PATHFINDER_RW + announce_entry[IDX_AT_RETRIES] += 1 + packet = announce_entry[IDX_AT_PACKET] + block_rebroadcasts = announce_entry[IDX_AT_BLCK_RBRD] + attached_interface = announce_entry[IDX_AT_ATTCHD_IF] + announce_context = RNS.Packet.NONE + if block_rebroadcasts: announce_context = RNS.Packet.PATH_RESPONSE + announce_data = packet.data + announce_identity = RNS.Identity.recall(packet.destination_hash) + announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); + announce_destination.hash = packet.destination_hash + announce_destination.hexhash = announce_destination.hash.hex() + + new_packet = RNS.Packet( + announce_destination, + announce_data, + RNS.Packet.ANNOUNCE, + context = announce_context, + header_type = RNS.Packet.HEADER_2, + transport_type = Transport.TRANSPORT, + transport_id = Transport.identity.hash, + attached_interface = attached_interface, + context_flag = packet.context_flag, + ) - new_packet.hops = announce_entry[4] - if block_rebroadcasts: - RNS.log("Rebroadcasting announce as path response for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG) - else: - RNS.log("Rebroadcasting announce for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG) - - outgoing.append(new_packet) + new_packet.hops = announce_entry[4] + if block_rebroadcasts: + RNS.log("Rebroadcasting announce as path response for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG) + else: + RNS.log("Rebroadcasting announce for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG) + + outgoing.append(new_packet) - # This handles an edge case where a peer sends a past - # request for a destination just after an announce for - # said destination has arrived, but before it has been - # rebroadcast locally. In such a case the actual announce - # is temporarily held, and then reinserted when the path - # request has been served to the peer. - if destination_hash in Transport.held_announces: - held_entry = Transport.held_announces.pop(destination_hash) - Transport.announce_table[destination_hash] = held_entry - RNS.log("Reinserting held announce into table", RNS.LOG_DEBUG) + # This handles an edge case where a peer sends a past + # request for a destination just after an announce for + # said destination has arrived, but before it has been + # rebroadcast locally. In such a case the actual announce + # is temporarily held, and then reinserted when the path + # request has been served to the peer. + if destination_hash in Transport.held_announces: + held_entry = Transport.held_announces.pop(destination_hash) + Transport.announce_table[destination_hash] = held_entry + RNS.log("Reinserting held announce into table", RNS.LOG_DEBUG) - for destination_hash in completed_announces: - if destination_hash in Transport.announce_table: - Transport.announce_table.pop(destination_hash) + for destination_hash in completed_announces: + if destination_hash in Transport.announce_table: Transport.announce_table.pop(destination_hash) Transport.announces_last_checked = time.time() @@ -593,233 +609,244 @@ class Transport: # Cull invalidated path requests if time.time() > Transport.pending_prs_last_checked+Transport.pending_prs_check_interval: - for destination_hash in Transport.pending_local_path_requests.copy(): - if not Transport.pending_local_path_requests[destination_hash] in Transport.interfaces: - Transport.pending_local_path_requests.pop(destination_hash) + with Transport.pending_local_prs_lock: + for destination_hash in Transport.pending_local_path_requests: + if not Transport.pending_local_path_requests[destination_hash] in Transport.interfaces: + Transport.pending_local_path_requests.pop(destination_hash) Transport.pending_prs_last_checked = time.time() # Cull the path request tags list if it has reached its max size if len(Transport.discovery_pr_tags) > Transport.max_pr_tags: - Transport.discovery_pr_tags = Transport.discovery_pr_tags[len(Transport.discovery_pr_tags)-Transport.max_pr_tags:len(Transport.discovery_pr_tags)-1] + with Transport.discovery_pr_tags_lock: + Transport.discovery_pr_tags = Transport.discovery_pr_tags[len(Transport.discovery_pr_tags)-Transport.max_pr_tags:len(Transport.discovery_pr_tags)-1] if time.time() > Transport.tables_last_culled + Transport.tables_cull_interval: # Remove unneeded path state entries stale_path_states = [] - for destination_hash in Transport.path_states: - if not destination_hash in Transport.path_table: - stale_path_states.append(destination_hash) + with Transport.path_states_lock: + for destination_hash in Transport.path_states: + if not destination_hash in Transport.path_table: + stale_path_states.append(destination_hash) # Cull the reverse table according to timeout stale_reverse_entries = [] - for truncated_packet_hash in Transport.reverse_table: - reverse_entry = Transport.reverse_table[truncated_packet_hash] - if time.time() > reverse_entry[IDX_RT_TIMESTAMP] + Transport.REVERSE_TIMEOUT: - stale_reverse_entries.append(truncated_packet_hash) - elif not reverse_entry[IDX_RT_OUTB_IF] in Transport.interfaces: - stale_reverse_entries.append(truncated_packet_hash) - elif not reverse_entry[IDX_RT_RCVD_IF] in Transport.interfaces: - stale_reverse_entries.append(truncated_packet_hash) + with Transport.reverse_table_lock: + for truncated_packet_hash in Transport.reverse_table: + reverse_entry = Transport.reverse_table[truncated_packet_hash] + if time.time() > reverse_entry[IDX_RT_TIMESTAMP] + Transport.REVERSE_TIMEOUT: stale_reverse_entries.append(truncated_packet_hash) + elif not reverse_entry[IDX_RT_OUTB_IF] in Transport.interfaces: stale_reverse_entries.append(truncated_packet_hash) + elif not reverse_entry[IDX_RT_RCVD_IF] in Transport.interfaces: stale_reverse_entries.append(truncated_packet_hash) # Cull the link table according to timeout stale_links = [] - for link_id in Transport.link_table: - link_entry = Transport.link_table[link_id] + with Transport.link_table_lock: + for link_id in Transport.link_table: + link_entry = Transport.link_table[link_id] - if link_entry[IDX_LT_VALIDATED] == True: - if time.time() > link_entry[IDX_LT_TIMESTAMP] + Transport.LINK_TIMEOUT: - stale_links.append(link_id) - elif not link_entry[IDX_LT_NH_IF] in Transport.interfaces: - stale_links.append(link_id) - elif not link_entry[IDX_LT_RCVD_IF] in Transport.interfaces: - stale_links.append(link_id) - else: - if time.time() > link_entry[IDX_LT_PROOF_TMO]: - stale_links.append(link_id) + if link_entry[IDX_LT_VALIDATED] == True: + if time.time() > link_entry[IDX_LT_TIMESTAMP] + Transport.LINK_TIMEOUT: stale_links.append(link_id) + elif not link_entry[IDX_LT_NH_IF] in Transport.interfaces: stale_links.append(link_id) + elif not link_entry[IDX_LT_RCVD_IF] in Transport.interfaces: stale_links.append(link_id) + + else: + if time.time() > link_entry[IDX_LT_PROOF_TMO]: + stale_links.append(link_id) - last_path_request = 0 - if link_entry[IDX_LT_DSTHASH] in Transport.path_requests: - last_path_request = Transport.path_requests[link_entry[IDX_LT_DSTHASH]] + last_path_request = 0 + with Transport.path_requests_lock: + if link_entry[IDX_LT_DSTHASH] in Transport.path_requests: + last_path_request = Transport.path_requests[link_entry[IDX_LT_DSTHASH]] - lr_taken_hops = link_entry[IDX_LT_HOPS] + lr_taken_hops = link_entry[IDX_LT_HOPS] - path_request_throttle = time.time() - last_path_request < Transport.PATH_REQUEST_MI - path_request_conditions = False - - # If the path has been invalidated between the time of - # making the link request and now, try to rediscover it - if not Transport.has_path(link_entry[IDX_LT_DSTHASH]): - RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and path is now missing", RNS.LOG_DEBUG) - path_request_conditions =True + path_request_throttle = time.time() - last_path_request < Transport.PATH_REQUEST_MI + path_request_conditions = False + + # If the path has been invalidated between the time of + # making the link request and now, try to rediscover it + if not Transport.has_path(link_entry[IDX_LT_DSTHASH]): + RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and path is now missing", RNS.LOG_DEBUG) + path_request_conditions = True - # If this link request was originated from a local client - # attempt to rediscover a path to the destination, if this - # has not already happened recently. - elif not path_request_throttle and lr_taken_hops == 0: - RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted local client link was never established", RNS.LOG_DEBUG) - path_request_conditions = True + # If this link request was originated from a local client + # attempt to rediscover a path to the destination, if this + # has not already happened recently. + elif not path_request_throttle and lr_taken_hops == 0: + RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted local client link was never established", RNS.LOG_DEBUG) + path_request_conditions = True - # If the link destination was previously only 1 hop - # away, this likely means that it was local to one - # of our interfaces, and that it roamed somewhere else. - # In that case, try to discover a new path, and mark - # the old one as unresponsive. - elif not path_request_throttle and Transport.hops_to(link_entry[IDX_LT_DSTHASH]) == 1: - RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and destination was previously local to an interface on this instance", RNS.LOG_DEBUG) - path_request_conditions = True - blocked_if = link_entry[IDX_LT_RCVD_IF] + # If the link destination was previously only 1 hop + # away, this likely means that it was local to one + # of our interfaces, and that it roamed somewhere else. + # In that case, try to discover a new path, and mark + # the old one as unresponsive. + elif not path_request_throttle and Transport.hops_to(link_entry[IDX_LT_DSTHASH]) == 1: + RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and destination was previously local to an interface on this instance", RNS.LOG_DEBUG) + path_request_conditions = True + blocked_if = link_entry[IDX_LT_RCVD_IF] - # TODO: This might result in the path re-resolution - # only being able to happen once, since new path found - # after allowing update from higher hop-count path, after - # marking old path unresponsive, might be more than 1 hop away, - # thus dealocking us into waiting for a new announce all-together. - # Is this problematic, or does it actually not matter? - # Best would be to have full support for alternative paths, - # and score them according to number of unsuccessful tries or - # similar. - if RNS.Reticulum.transport_enabled(): - if hasattr(link_entry[IDX_LT_RCVD_IF], "mode") and link_entry[IDX_LT_RCVD_IF].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: - Transport.mark_path_unresponsive(link_entry[IDX_LT_DSTHASH]) + # TODO: This might result in the path re-resolution + # only being able to happen once, since new path found + # after allowing update from higher hop-count path, after + # marking old path unresponsive, might be more than 1 hop away, + # thus dealocking us into waiting for a new announce all-together. + # Is this problematic, or does it actually not matter? + # Best would be to have full support for alternative paths, + # and score them according to number of unsuccessful tries or + # similar. + if RNS.Reticulum.transport_enabled(): + if hasattr(link_entry[IDX_LT_RCVD_IF], "mode") and link_entry[IDX_LT_RCVD_IF].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: + Transport.mark_path_unresponsive(link_entry[IDX_LT_DSTHASH]) - # If the link initiator is only 1 hop away, - # this likely means that network topology has - # changed. In that case, we try to discover a new path, - # and mark the old one as potentially unresponsive. - elif not path_request_throttle and lr_taken_hops == 1: - RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and link initiator is local to an interface on this instance", RNS.LOG_DEBUG) - path_request_conditions = True - blocked_if = link_entry[IDX_LT_RCVD_IF] + # If the link initiator is only 1 hop away, + # this likely means that network topology has + # changed. In that case, we try to discover a new path, + # and mark the old one as potentially unresponsive. + elif not path_request_throttle and lr_taken_hops == 1: + RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and link initiator is local to an interface on this instance", RNS.LOG_DEBUG) + path_request_conditions = True + blocked_if = link_entry[IDX_LT_RCVD_IF] - if RNS.Reticulum.transport_enabled(): - if hasattr(link_entry[IDX_LT_RCVD_IF], "mode") and link_entry[IDX_LT_RCVD_IF].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: - Transport.mark_path_unresponsive(link_entry[IDX_LT_DSTHASH]) + if RNS.Reticulum.transport_enabled(): + if hasattr(link_entry[IDX_LT_RCVD_IF], "mode") and link_entry[IDX_LT_RCVD_IF].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: + Transport.mark_path_unresponsive(link_entry[IDX_LT_DSTHASH]) - if path_request_conditions: - if not link_entry[IDX_LT_DSTHASH] in path_requests: - path_requests[link_entry[IDX_LT_DSTHASH]] = blocked_if + if path_request_conditions: + with Transport.path_requests_lock: + if not link_entry[IDX_LT_DSTHASH] in path_requests: + path_requests[link_entry[IDX_LT_DSTHASH]] = blocked_if - if not RNS.Reticulum.transport_enabled(): - # Drop current path if we are not a transport instance, to - # allow using higher-hop count paths or reused announces - # from newly adjacent transport instances. - Transport.expire_path(link_entry[IDX_LT_DSTHASH]) + if not RNS.Reticulum.transport_enabled(): + # Drop current path if we are not a transport instance, to + # allow using higher-hop count paths or reused announces + # from newly adjacent transport instances. + Transport.expire_path(link_entry[IDX_LT_DSTHASH]) # Cull the path table stale_paths = [] - for destination_hash in Transport.path_table: - destination_entry = Transport.path_table[destination_hash] - attached_interface = destination_entry[IDX_PT_RVCD_IF] + with Transport.path_table_lock: + for destination_hash in Transport.path_table: + destination_entry = Transport.path_table[destination_hash] + attached_interface = destination_entry[IDX_PT_RVCD_IF] - if attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: - destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.AP_PATH_TIME - elif attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: - destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.ROAMING_PATH_TIME - else: - destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.DESTINATION_TIMEOUT + if attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: + destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.AP_PATH_TIME + elif attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: + destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.ROAMING_PATH_TIME + else: + destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.DESTINATION_TIMEOUT - if time.time() > destination_expiry: - stale_paths.append(destination_hash) - should_collect = True - RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG) - elif not attached_interface in Transport.interfaces: - stale_paths.append(destination_hash) - should_collect = True - RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" was removed since the attached interface no longer exists", RNS.LOG_DEBUG) + if time.time() > destination_expiry: + stale_paths.append(destination_hash) + should_collect = True + RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG) + elif not attached_interface in Transport.interfaces: + stale_paths.append(destination_hash) + should_collect = True + RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" was removed since the attached interface no longer exists", RNS.LOG_DEBUG) # Cull the pending discovery path requests table stale_discovery_path_requests = [] - for destination_hash in Transport.discovery_path_requests: - entry = Transport.discovery_path_requests[destination_hash] + with Transport.discovery_pr_lock: + for destination_hash in Transport.discovery_path_requests: + entry = Transport.discovery_path_requests[destination_hash] - if time.time() > entry["timeout"]: - stale_discovery_path_requests.append(destination_hash) - should_collect = True - RNS.log("Waiting path request for "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG) + if time.time() > entry["timeout"]: + stale_discovery_path_requests.append(destination_hash) + should_collect = True + RNS.log("Waiting path request for "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG) # Cull the tunnel table stale_tunnels = []; ti = 0 - for tunnel_id in Transport.tunnels: - tunnel_entry = Transport.tunnels[tunnel_id] + with Transport.tunnels_lock: + for tunnel_id in Transport.tunnels: + tunnel_entry = Transport.tunnels[tunnel_id] - expires = tunnel_entry[IDX_TT_EXPIRES] - if time.time() > expires: - stale_tunnels.append(tunnel_id) - should_collect = True - RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME) - else: - if tunnel_entry[IDX_TT_IF] and not tunnel_entry[IDX_TT_IF] in Transport.interfaces: - RNS.log(f"Removing non-existent tunnel interface {tunnel_entry[IDX_TT_IF]}", RNS.LOG_EXTREME) - tunnel_entry[IDX_TT_IF] = None + expires = tunnel_entry[IDX_TT_EXPIRES] + if time.time() > expires: + stale_tunnels.append(tunnel_id) + should_collect = True + RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME) + + else: + if tunnel_entry[IDX_TT_IF] and not tunnel_entry[IDX_TT_IF] in Transport.interfaces: + RNS.log(f"Removing non-existent tunnel interface {tunnel_entry[IDX_TT_IF]}", RNS.LOG_EXTREME) + tunnel_entry[IDX_TT_IF] = None - stale_tunnel_paths = [] - tunnel_paths = tunnel_entry[IDX_TT_PATHS] - for tunnel_path in tunnel_paths: - tunnel_path_entry = tunnel_paths[tunnel_path] + stale_tunnel_paths = [] + tunnel_paths = tunnel_entry[IDX_TT_PATHS] + for tunnel_path in tunnel_paths: + tunnel_path_entry = tunnel_paths[tunnel_path] - if time.time() > tunnel_path_entry[0] + Transport.DESTINATION_TIMEOUT: - stale_tunnel_paths.append(tunnel_path) - should_collect = True - RNS.log("Tunnel path to "+RNS.prettyhexrep(tunnel_path)+" timed out and was removed", RNS.LOG_EXTREME) - - for tunnel_path in stale_tunnel_paths: - tunnel_paths.pop(tunnel_path) - ti += 1 + if time.time() > tunnel_path_entry[0] + Transport.DESTINATION_TIMEOUT: + stale_tunnel_paths.append(tunnel_path) + should_collect = True + RNS.log("Tunnel path to "+RNS.prettyhexrep(tunnel_path)+" timed out and was removed", RNS.LOG_EXTREME) + for tunnel_path in stale_tunnel_paths: + tunnel_paths.pop(tunnel_path) + ti += 1 if ti > 0: if ti == 1: RNS.log("Removed "+str(ti)+" tunnel path", RNS.LOG_EXTREME) else: RNS.log("Removed "+str(ti)+" tunnel paths", RNS.LOG_EXTREME) i = 0 - for truncated_packet_hash in stale_reverse_entries: - Transport.reverse_table.pop(truncated_packet_hash) - i += 1 + with Transport.reverse_table_lock: + for truncated_packet_hash in stale_reverse_entries: + if truncated_packet_hash in Transport.reverse_table: Transport.reverse_table.pop(truncated_packet_hash) + i += 1 if i > 0: if i == 1: RNS.log("Released "+str(i)+" reverse table entry", RNS.LOG_EXTREME) else: RNS.log("Released "+str(i)+" reverse table entries", RNS.LOG_EXTREME) i = 0 - for link_id in stale_links: - Transport.link_table.pop(link_id) - i += 1 + with Transport.link_table_lock: + for link_id in stale_links: + Transport.link_table.pop(link_id) + i += 1 if i > 0: if i == 1: RNS.log("Released "+str(i)+" link", RNS.LOG_EXTREME) else: RNS.log("Released "+str(i)+" links", RNS.LOG_EXTREME) i = 0 - for destination_hash in stale_paths: - Transport.path_table.pop(destination_hash) - i += 1 + with Transport.path_table_lock: + for destination_hash in stale_paths: + if destination_hash in Transport.path_table: Transport.path_table.pop(destination_hash) + i += 1 if i > 0: if i == 1: RNS.log("Removed "+str(i)+" path", RNS.LOG_EXTREME) else: RNS.log("Removed "+str(i)+" paths", RNS.LOG_EXTREME) i = 0 - for destination_hash in stale_discovery_path_requests: - Transport.discovery_path_requests.pop(destination_hash) - i += 1 + with Transport.discovery_pr_lock: + for destination_hash in stale_discovery_path_requests: + Transport.discovery_path_requests.pop(destination_hash) + i += 1 if i > 0: if i == 1: RNS.log("Removed "+str(i)+" waiting path request", RNS.LOG_EXTREME) else: RNS.log("Removed "+str(i)+" waiting path requests", RNS.LOG_EXTREME) i = 0 - for tunnel_id in stale_tunnels: - Transport.tunnels.pop(tunnel_id) - i += 1 + with Transport.tunnels_lock: + for tunnel_id in stale_tunnels: + Transport.tunnels.pop(tunnel_id) + i += 1 if i > 0: if i == 1: RNS.log("Removed "+str(i)+" tunnel", RNS.LOG_EXTREME) else: RNS.log("Removed "+str(i)+" tunnels", RNS.LOG_EXTREME) i = 0 - for destination_hash in stale_path_states: - Transport.path_states.pop(destination_hash) - i += 1 + with Transport.path_states_lock: + for destination_hash in stale_path_states: + Transport.path_states.pop(destination_hash) + i += 1 if i > 0: if i == 1: RNS.log("Removed "+str(i)+" path state entry", RNS.LOG_EXTREME) @@ -830,9 +857,13 @@ class Transport: # Run interface-related jobs if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval: Transport.prioritize_interfaces() - for interface in Transport.interfaces: - interface.process_held_announces() - Transport.interface_last_jobs = time.time() + try: + for interface in Transport.interfaces: + interface.process_held_announces() + Transport.interface_last_jobs = time.time() + except Exception as e: + RNS.log(f"Error while processing held per-interface announces: {e}", RNS.LOG_WARNING) + RNS.log(f"Postponing until next job run", RNS.LOG_WARNING) # Clean packet caches if time.time() > Transport.cache_last_cleaned+Transport.cache_clean_interval: @@ -877,10 +908,6 @@ class Transport: if should_collect: gc.collect() - else: - # Transport jobs were locked, do nothing - pass - except Exception as e: RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -1754,22 +1781,23 @@ class Transport: # If this is a path response from a local client, # check if any external interfaces have pending # path requests. - if packet.destination_hash in Transport.pending_local_path_requests: - desiring_interface = Transport.pending_local_path_requests.pop(packet.destination_hash) - retransmit_timeout = now - retries = Transport.PATHFINDER_R + with Transport.pending_local_prs_lock: + if packet.destination_hash in Transport.pending_local_path_requests: + desiring_interface = Transport.pending_local_path_requests.pop(packet.destination_hash) + retransmit_timeout = now + retries = Transport.PATHFINDER_R - Transport.announce_table[packet.destination_hash] = [ - now, - retransmit_timeout, - retries, - received_from, - announce_hops, - packet, - local_rebroadcasts, - block_rebroadcasts, - attached_interface - ] + Transport.announce_table[packet.destination_hash] = [ + now, + retransmit_timeout, + retries, + received_from, + announce_hops, + packet, + local_rebroadcasts, + block_rebroadcasts, + attached_interface + ] # If we have any local clients connected, we re- # transmit the announce to them immediately @@ -2467,44 +2495,45 @@ class Transport: @staticmethod def expire_path(destination_hash): - if destination_hash in Transport.path_table: - Transport.path_table[destination_hash][IDX_PT_TIMESTAMP] = 0 - Transport.tables_last_culled = 0 - return True - else: - return False + with Transport.path_table_lock: + if destination_hash in Transport.path_table: + Transport.path_table[destination_hash][IDX_PT_TIMESTAMP] = 0 + Transport.tables_last_culled = 0 + return True + + else: return False @staticmethod def mark_path_unresponsive(destination_hash): if destination_hash in Transport.path_table: - Transport.path_states[destination_hash] = Transport.STATE_UNRESPONSIVE + with Transport.path_states_lock: Transport.path_states[destination_hash] = Transport.STATE_UNRESPONSIVE return True - else: - return False + + else: return False @staticmethod def mark_path_responsive(destination_hash): if destination_hash in Transport.path_table: - Transport.path_states[destination_hash] = Transport.STATE_RESPONSIVE + with Transport.path_states_lock: Transport.path_states[destination_hash] = Transport.STATE_RESPONSIVE return True - else: - return False + + else: return False @staticmethod def mark_path_unknown_state(destination_hash): if destination_hash in Transport.path_table: - Transport.path_states[destination_hash] = Transport.STATE_UNKNOWN + with Transport.path_states_lock: Transport.path_states[destination_hash] = Transport.STATE_UNKNOWN return True - else: - return False + + else: return False @staticmethod def path_is_unresponsive(destination_hash): - if destination_hash in Transport.path_states: - if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE: - return True + with Transport.path_states_lock: + if destination_hash in Transport.path_states: + if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE: return True - return False + return False @staticmethod def await_path(destination_hash, timeout=None, on_interface=None): @@ -2533,28 +2562,19 @@ class Transport: :param destination_hash: A destination hash as *bytes*. :param on_interface: If specified, the path request will only be sent on this interface. In normal use, Reticulum handles this automatically, and this parameter should not be used. """ - if tag == None: - request_tag = RNS.Identity.get_random_hash() - else: - request_tag = tag + if tag == None: request_tag = RNS.Identity.get_random_hash() + else: request_tag = tag - if RNS.Reticulum.transport_enabled(): - path_request_data = destination_hash+Transport.identity.hash+request_tag - else: - path_request_data = destination_hash+request_tag + if RNS.Reticulum.transport_enabled(): path_request_data = destination_hash+Transport.identity.hash+request_tag + else: path_request_data = destination_hash+request_tag path_request_dst = RNS.Destination(None, RNS.Destination.OUT, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request") packet = RNS.Packet(path_request_dst, path_request_data, packet_type = RNS.Packet.DATA, transport_type = RNS.Transport.BROADCAST, header_type = RNS.Packet.HEADER_1, attached_interface = on_interface) if on_interface != None and recursive: - if not hasattr(on_interface, "announce_cap"): - on_interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP - - if not hasattr(on_interface, "announce_allowed_at"): - on_interface.announce_allowed_at = 0 - - if not hasattr(on_interface, "announce_queue"): - on_interface.announce_queue = [] + if not hasattr(on_interface, "announce_cap"): on_interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP + if not hasattr(on_interface, "announce_allowed_at"): on_interface.announce_allowed_at = 0 + if not hasattr(on_interface, "announce_queue"): on_interface.announce_queue = [] queued_announces = True if len(on_interface.announce_queue) > 0 else False if queued_announces: @@ -2581,8 +2601,7 @@ class Transport: if isinstance(data, list) and len(data) > 0: response = [] response.append(Transport.owner.get_interface_stats()) - if data[0] == True: - response.append(Transport.owner.get_link_count()) + if data[0] == True: response.append(Transport.owner.get_link_count()) return response @@ -2601,10 +2620,8 @@ class Transport: command = data[0] destination_hash = None max_hops = None - if len(data) > 1: - destination_hash = data[1] - if len(data) > 2: - max_hops = data[2] + if len(data) > 1: destination_hash = data[1] + if len(data) > 2: max_hops = data[2] if command == "table": table = Transport.owner.get_path_table(max_hops=max_hops) @@ -2658,25 +2675,21 @@ class Transport: unique_tag = destination_hash+tag_bytes - if not unique_tag in Transport.discovery_pr_tags: - Transport.discovery_pr_tags.append(unique_tag) + with Transport.discovery_pr_tags_lock: + if not unique_tag in Transport.discovery_pr_tags: + Transport.discovery_pr_tags.append(unique_tag) - Transport.path_request( - destination_hash, - Transport.from_local_client(packet), - packet.receiving_interface, - requestor_transport_id = requesting_transport_instance, - tag=tag_bytes - ) + Transport.path_request(destination_hash, + Transport.from_local_client(packet), + packet.receiving_interface, + requestor_transport_id = requesting_transport_instance, + tag=tag_bytes) - else: - RNS.log("Ignoring duplicate path request for "+RNS.prettyhexrep(destination_hash)+" with tag "+RNS.prettyhexrep(unique_tag), RNS.LOG_DEBUG) + else: RNS.log("Ignoring duplicate path request for "+RNS.prettyhexrep(destination_hash)+" with tag "+RNS.prettyhexrep(unique_tag), RNS.LOG_DEBUG) - else: - RNS.log("Ignoring tagless path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG) + else: RNS.log("Ignoring tagless path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG) - except Exception as e: - RNS.log("Error while handling path request. The contained exception was: "+str(e), RNS.LOG_ERROR) + except Exception as e: RNS.log("Error while handling path request. The contained exception was: "+str(e), RNS.LOG_ERROR) @staticmethod def path_request(destination_hash, is_from_local_client, attached_interface, requestor_transport_id=None, tag=None): @@ -2685,10 +2698,9 @@ class Transport: if attached_interface != None: if RNS.Reticulum.transport_enabled() and attached_interface.mode in RNS.Interfaces.Interface.Interface.DISCOVER_PATHS_FOR: should_search_for_unknown = True - interface_str = " on "+str(attached_interface) - else: - interface_str = "" + + else: interface_str = "" RNS.log("Path request for "+RNS.prettyhexrep(destination_hash)+interface_str, RNS.LOG_DEBUG) @@ -2699,7 +2711,8 @@ class Transport: if Transport.is_local_client_interface(destination_interface): destination_exists_on_local_client = True - Transport.pending_local_path_requests[destination_hash] = attached_interface + with Transport.pending_local_prs_lock: + Transport.pending_local_path_requests[destination_hash] = attached_interface local_destination = next((d for d in Transport.destinations if d.hash == destination_hash), None) if local_destination != None: