diff --git a/RNS/Destination.py b/RNS/Destination.py index 6f5b1fc..28251b0 100755 --- a/RNS/Destination.py +++ b/RNS/Destination.py @@ -421,8 +421,7 @@ class Destination: else: if packet.packet_type == RNS.Packet.DATA: if self.callbacks.packet != None: - try: - self.callbacks.packet(plaintext, packet) + try: self.callbacks.packet(plaintext, packet) except Exception as e: RNS.log("Error while executing receive callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) diff --git a/RNS/Transport.py b/RNS/Transport.py index be457cb..aa755f0 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -119,8 +119,11 @@ class Transport: discovery_pr_tags = [] # A table for keeping track of tagged path requests max_pr_tags = 32000 # Maximum amount of unique path request tags to remember + destinations_lock = Lock() inbound_announce_lock = Lock() announce_table_lock = Lock() + announce_rate_table_lock = Lock() + announce_handler_lock = Lock() path_table_lock = Lock() reverse_table_lock = Lock() link_table_lock = Lock() @@ -156,8 +159,6 @@ class Transport: pending_local_path_requests = {} start_time = None - jobs_locked = False - jobs_running = False hashlist_maxsize = 1000000 job_interval = 0.250 links_last_checked = 0.0 @@ -193,7 +194,6 @@ class Transport: @staticmethod def start(reticulum_instance): - Transport.jobs_running = True Transport.owner = reticulum_instance if Transport.identity == None: @@ -262,7 +262,6 @@ class Transport: Transport.last_mgmt_announce = time.time() - Transport.mgmt_announce_interval + 15 # Start job loops - Transport.jobs_running = False threading.Thread(target=Transport.jobloop, daemon=True).start() threading.Thread(target=Transport.count_traffic_loop, daemon=True).start() @@ -473,7 +472,6 @@ class Transport: outgoing = [] path_requests = {} blocked_if = None - Transport.jobs_running = True try: with Transport.jobs_lock: @@ -912,8 +910,6 @@ class Transport: RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - Transport.jobs_running = False - for packet in outgoing: packet.send() for destination_hash in path_requests: @@ -966,10 +962,6 @@ class Transport: @staticmethod def outbound(packet): - while (Transport.jobs_running): sleep(0.0005) - - Transport.jobs_locked = True - sent = False outbound_time = time.time() @@ -992,31 +984,37 @@ class Transport: if generate_receipt: packet.receipt = RNS.PacketReceipt(packet) - Transport.receipts.append(packet.receipt) + with Transport.receipts_lock: Transport.receipts.append(packet.receipt) # TODO: Enable when caching has been redesigned # Transport.cache(packet) # Check if we have a known path for the destination in the path table if packet.packet_type != RNS.Packet.ANNOUNCE and packet.destination.type != RNS.Destination.PLAIN and packet.destination.type != RNS.Destination.GROUP and packet.destination_hash in Transport.path_table: - outbound_interface = Transport.path_table[packet.destination_hash][IDX_PT_RVCD_IF] + with Transport.path_table_lock: + if not packet.destination_hash in Transport.path_table: + RNS.log(f"Dropped packet since path table entry disappeared during outbound processing", RNS.LOG_WARNING) + return False + else: path_entry = Transport.path_table[packet.destination_hash] + + outbound_interface = path_entry[IDX_PT_RVCD_IF] # If there's more than one hop to the destination, and we know # a path, we insert the packet into transport by adding the next # transport nodes address to the header, and modifying the flags. # This rule applies both for "normal" transport, and when connected # to a local shared Reticulum instance. - if Transport.path_table[packet.destination_hash][IDX_PT_HOPS] > 1: + if path_entry[IDX_PT_HOPS] > 1: if packet.header_type == RNS.Packet.HEADER_1: # Insert packet into transport new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111) new_raw = struct.pack("!B", new_flags) new_raw += packet.raw[1:2] - new_raw += Transport.path_table[packet.destination_hash][IDX_PT_NEXT_HOP] + new_raw += path_entry[IDX_PT_NEXT_HOP] new_raw += packet.raw[2:] packet_sent(packet) Transport.transmit(outbound_interface, new_raw) - Transport.path_table[packet.destination_hash][IDX_PT_TIMESTAMP] = time.time() + path_entry[IDX_PT_TIMESTAMP] = time.time() sent = True # In the special case where we are connected to a local shared @@ -1026,17 +1024,17 @@ class Transport: # one hop away would just be broadcast directly, but since we # are "behind" a shared instance, we need to get that instance # to transport it onto the network. - elif Transport.path_table[packet.destination_hash][IDX_PT_HOPS] == 1 and Transport.owner.is_connected_to_shared_instance: + elif path_entry[IDX_PT_HOPS] == 1 and Transport.owner.is_connected_to_shared_instance: if packet.header_type == RNS.Packet.HEADER_1: # Insert packet into transport new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111) new_raw = struct.pack("!B", new_flags) new_raw += packet.raw[1:2] - new_raw += Transport.path_table[packet.destination_hash][IDX_PT_NEXT_HOP] + new_raw += path_entry[IDX_PT_NEXT_HOP] new_raw += packet.raw[2:] packet_sent(packet) Transport.transmit(outbound_interface, new_raw) - Transport.path_table[packet.destination_hash][IDX_PT_TIMESTAMP] = time.time() + path_entry[IDX_PT_TIMESTAMP] = time.time() sent = True # If none of the above applies, we know the destination is @@ -1058,10 +1056,8 @@ class Transport: should_transmit = True if packet.destination.type == RNS.Destination.LINK: - if packet.destination.status == RNS.Link.CLOSED: - should_transmit = False - if interface != packet.destination.attached_interface: - should_transmit = False + if packet.destination.status == RNS.Link.CLOSED: should_transmit = False + if interface != packet.destination.attached_interface: should_transmit = False if packet.attached_interface != None and interface != packet.attached_interface: should_transmit = False @@ -1073,7 +1069,9 @@ class Transport: should_transmit = False elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: - local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) + with Transport.destinations_lock: + local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) + if local_destination != None: # RNS.log("Allowing announce broadcast on roaming-mode interface from instance-local destination", RNS.LOG_EXTREME) pass @@ -1094,7 +1092,8 @@ class Transport: should_transmit = False elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: - local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) + with Transport.destinations_lock: + local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) if local_destination != None: # RNS.log("Allowing announce broadcast on boundary-mode interface from instance-local destination", RNS.LOG_EXTREME) pass @@ -1117,14 +1116,9 @@ class Transport: # TODO: Rethink whether this is actually optimal. if packet.hops > 0: - if not hasattr(interface, "announce_cap"): - interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP - - if not hasattr(interface, "announce_allowed_at"): - interface.announce_allowed_at = 0 - - if not hasattr(interface, "announce_queue"): - interface.announce_queue = [] + if not hasattr(interface, "announce_cap"): interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP + if not hasattr(interface, "announce_allowed_at"): interface.announce_allowed_at = 0 + if not hasattr(interface, "announce_queue"): interface.announce_queue = [] queued_announces = True if len(interface.announce_queue) > 0 else False if not queued_announces and outbound_time > interface.announce_allowed_at and interface.bitrate != None and interface.bitrate != 0: @@ -1154,13 +1148,11 @@ class Transport: e["raw"] = packet.raw if should_queue: - entry = { - "destination": packet.destination_hash, - "time": outbound_time, - "hops": packet.hops, - "emitted": Transport.announce_emitted(packet), - "raw": packet.raw - } + entry = { "destination": packet.destination_hash, + "time": outbound_time, + "hops": packet.hops, + "emitted": Transport.announce_emitted(packet), + "raw": packet.raw } queued_announces = True if len(interface.announce_queue) > 0 else False interface.announce_queue.append(entry) @@ -1170,10 +1162,8 @@ class Transport: timer = threading.Timer(wait_time, interface.process_announce_queue) timer.start() - if wait_time < 1: - wait_time_str = str(round(wait_time*1000,2))+"ms" - else: - wait_time_str = str(round(wait_time*1,2))+"s" + if wait_time < 1: wait_time_str = str(round(wait_time*1000,2))+"ms" + else: wait_time_str = str(round(wait_time*1,2))+"s" ql_str = str(len(interface.announce_queue)) RNS.log("Added announce to queue (height "+ql_str+") on "+str(interface)+" for processing in "+wait_time_str, RNS.LOG_EXTREME) @@ -1181,10 +1171,8 @@ class Transport: else: wait_time = max(interface.announce_allowed_at - time.time(), 0) - if wait_time < 1: - wait_time_str = str(round(wait_time*1000,2))+"ms" - else: - wait_time_str = str(round(wait_time*1,2))+"s" + if wait_time < 1: wait_time_str = str(round(wait_time*1000,2))+"ms" + else: wait_time_str = str(round(wait_time*1,2))+"s" ql_str = str(len(interface.announce_queue)) RNS.log("Added announce to queue (height "+ql_str+") on "+str(interface)+" for processing in "+wait_time_str, RNS.LOG_EXTREME) @@ -1199,12 +1187,10 @@ class Transport: stored_hash = True Transport.transmit(interface, packet.raw) - if packet.packet_type == RNS.Packet.ANNOUNCE: - interface.sent_announce() + if packet.packet_type == RNS.Packet.ANNOUNCE: interface.sent_announce() packet_sent(packet) sent = True - Transport.jobs_locked = False return sent @staticmethod @@ -1253,8 +1239,7 @@ class Transport: RNS.log("Dropped invalid GROUP announce packet", RNS.LOG_DEBUG) return False - if not packet.packet_hash in Transport.packet_hashlist and not packet.packet_hash in Transport.packet_hashlist_prev: - return True + if not packet.packet_hash in Transport.packet_hashlist and not packet.packet_hash in Transport.packet_hashlist_prev: return True else: if packet.packet_type == RNS.Packet.ANNOUNCE: if packet.destination_type == RNS.Destination.SINGLE: @@ -1321,16 +1306,10 @@ class Transport: else: return - while (Transport.jobs_running): sleep(0.0005) - if Transport.identity == None: return - Transport.jobs_locked = True - packet = RNS.Packet(None, raw) - if not packet.unpack(): - Transport.jobs_locked = False - return + if not packet.unpack(): return packet.receiving_interface = interface packet.hops += 1 @@ -1430,9 +1409,7 @@ class Transport: # it, do so and stop processing. Otherwise resume # normal processing. if packet.context == RNS.Packet.CACHE_REQUEST: - if Transport.cache_request_packet(packet): - Transport.jobs_locked = False - return + if Transport.cache_request_packet(packet): return # If the packet is in transport, check whether we # are the designated next hop, and process it @@ -1503,7 +1480,7 @@ class Transport: False, # 7: Validated proof_timeout ] # 8: Proof timeout timestamp - Transport.link_table[RNS.Link.link_id_from_lr_packet(packet)] = link_entry + with Transport.link_table_lock: Transport.link_table[RNS.Link.link_id_from_lr_packet(packet)] = link_entry else: # Entry format is @@ -1511,10 +1488,10 @@ class Transport: outbound_interface, # 1: Outbound interface time.time() ] # 2: Timestamp - Transport.reverse_table[packet.getTruncatedHash()] = reverse_entry + with Transport.reverse_table_lock: Transport.reverse_table[packet.getTruncatedHash()] = reverse_entry Transport.transmit(outbound_interface, new_raw) - Transport.path_table[packet.destination_hash][IDX_PT_TIMESTAMP] = time.time() + with Transport.path_table_lock: Transport.path_table[packet.destination_hash][IDX_PT_TIMESTAMP] = time.time() else: # TODO: There should probably be some kind of REJECT @@ -1561,8 +1538,7 @@ class Transport: Transport.transmit(outbound_interface, new_raw) Transport.link_table[packet.destination_hash][IDX_LT_TIMESTAMP] = time.time() - # TODO: Test and possibly enable this at some point - # Transport.jobs_locked = False + # TODO: Can we return safely here? Test and possibly enable this at some point. # return @@ -1580,10 +1556,10 @@ class Transport: # by normal announce rate limiting. if interface.should_ingress_limit(): interface.hold_announce(packet) - Transport.jobs_locked = False return - local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) + with Transport.destinations_lock: + local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) if local_destination == None and RNS.Identity.validate_announce(packet): if packet.transport_id != None: received_from = packet.transport_id @@ -1618,7 +1594,8 @@ class Transport: # First, check that the announce is not for a destination # local to this system, and that hops are less than the max - if (not any(packet.destination_hash == d.hash for d in Transport.destinations) and packet.hops < Transport.PATHFINDER_M+1): + with Transport.destinations_lock: local_and_hops_condition = (not any(packet.destination_hash == d.hash for d in Transport.destinations) and packet.hops < Transport.PATHFINDER_M+1) + if local_and_hops_condition: announce_emitted = Transport.announce_emitted(packet) random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10] @@ -1700,36 +1677,38 @@ class Transport: if should_add: now = time.time() + is_from_local_client = Transport.from_local_client(packet) rate_blocked = False if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None: - if not packet.destination_hash in Transport.announce_rate_table: - rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]} - Transport.announce_rate_table[packet.destination_hash] = rate_entry - - else: - rate_entry = Transport.announce_rate_table[packet.destination_hash] - rate_entry["timestamps"].append(now) - - while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS: - rate_entry["timestamps"].pop(0) - - current_rate = now - rate_entry["last"] - - if now > rate_entry["blocked_until"]: - if current_rate < packet.receiving_interface.announce_rate_target: rate_entry["rate_violations"] += 1 - else: rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1) - - if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace: - rate_target = packet.receiving_interface.announce_rate_target - rate_penalty = packet.receiving_interface.announce_rate_penalty - rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty - rate_blocked = True - else: - rate_entry["last"] = now + with Transport.announce_rate_table_lock: + if not packet.destination_hash in Transport.announce_rate_table: + rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]} + Transport.announce_rate_table[packet.destination_hash] = rate_entry else: - rate_blocked = True + rate_entry = Transport.announce_rate_table[packet.destination_hash] + rate_entry["timestamps"].append(now) + + while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS: + rate_entry["timestamps"].pop(0) + + current_rate = now - rate_entry["last"] + + if now > rate_entry["blocked_until"]: + if current_rate < packet.receiving_interface.announce_rate_target: rate_entry["rate_violations"] += 1 + else: rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1) + + if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace: + rate_target = packet.receiving_interface.announce_rate_target + rate_penalty = packet.receiving_interface.announce_rate_penalty + rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty + rate_blocked = True + else: + rate_entry["last"] = now + + else: + rate_blocked = True retries = 0 @@ -1741,43 +1720,41 @@ class Transport: retransmit_timeout = now + (RNS.rand() * Transport.PATHFINDER_RW) if hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: - expires = now + Transport.AP_PATH_TIME + expires = now + Transport.AP_PATH_TIME elif hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: - expires = now + Transport.ROAMING_PATH_TIME + expires = now + Transport.ROAMING_PATH_TIME else: - expires = now + Transport.PATHFINDER_E + expires = now + Transport.PATHFINDER_E if not random_blob in random_blobs: random_blobs.append(random_blob) random_blobs = random_blobs[-Transport.MAX_RANDOM_BLOBS:] - if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE: + if (RNS.Reticulum.transport_enabled() or is_from_local_client) and packet.context != RNS.Packet.PATH_RESPONSE: # Insert announce into announce table for retransmission - if rate_blocked: - RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG) - + if rate_blocked: RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG) else: - if Transport.from_local_client(packet): + if is_from_local_client: # If the announce is from a local client, # it is announced immediately, but only one time. retransmit_timeout = now retries = Transport.PATHFINDER_R - Transport.announce_table[packet.destination_hash] = [ - now, # 0: IDX_AT_TIMESTAMP - retransmit_timeout, # 1: IDX_AT_RTRNS_TMO - retries, # 2: IDX_AT_RETRIES - received_from, # 3: IDX_AT_RCVD_IF - announce_hops, # 4: IDX_AT_HOPS - packet, # 5: IDX_AT_PACKET - local_rebroadcasts, # 6: IDX_AT_LCL_RBRD - block_rebroadcasts, # 7: IDX_AT_BLCK_RBRD - attached_interface, # 8: IDX_AT_ATTCHD_IF - ] + with Transport.announce_table_lock: + Transport.announce_table[packet.destination_hash] = [ + now, # 0: IDX_AT_TIMESTAMP + retransmit_timeout, # 1: IDX_AT_RTRNS_TMO + retries, # 2: IDX_AT_RETRIES + received_from, # 3: IDX_AT_RCVD_IF + announce_hops, # 4: IDX_AT_HOPS + packet, # 5: IDX_AT_PACKET + local_rebroadcasts, # 6: IDX_AT_LCL_RBRD + block_rebroadcasts, # 7: IDX_AT_BLCK_RBRD + attached_interface, # 8: IDX_AT_ATTCHD_IF + ] - # TODO: Check from_local_client once and store result - elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: + elif is_from_local_client and packet.context == RNS.Packet.PATH_RESPONSE: # If this is a path response from a local client, # check if any external interfaces have pending # path requests. @@ -1787,17 +1764,18 @@ class Transport: retransmit_timeout = now retries = Transport.PATHFINDER_R - Transport.announce_table[packet.destination_hash] = [ - now, - retransmit_timeout, - retries, - received_from, - announce_hops, - packet, - local_rebroadcasts, - block_rebroadcasts, - attached_interface - ] + with Transport.announce_table_lock: + Transport.announce_table[packet.destination_hash] = [ + now, + retransmit_timeout, + retries, + received_from, + announce_hops, + packet, + local_rebroadcasts, + block_rebroadcasts, + attached_interface + ] # If we have any local clients connected, we re- # transmit the announce to them immediately @@ -1810,13 +1788,13 @@ class Transport: announce_data = packet.data # TODO: Shouldn't the context be PATH_RESPONSE in the first case here? - if Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: + if is_from_local_client and packet.context == RNS.Packet.PATH_RESPONSE: for local_interface in Transport.local_client_interfaces: if packet.receiving_interface != local_interface: new_announce = RNS.Packet( announce_destination, announce_data, - RNS.Packet.ANNOUNCE, + RNS.Packet.ANNOUNCE, # <-- This one? context = announce_context, header_type = RNS.Packet.HEADER_2, transport_type = Transport.TRANSPORT, @@ -1880,73 +1858,75 @@ class Transport: if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True, packet_type="announce") path_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet.packet_hash] - Transport.path_table[packet.destination_hash] = path_table_entry + with Transport.path_table_lock: Transport.path_table[packet.destination_hash] = path_table_entry RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG) # If the receiving interface is a tunnel, we add the # announce to the tunnels table if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None: - tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id] - paths = tunnel_entry[IDX_TT_PATHS] - paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash] - expires = time.time() + Transport.DESTINATION_TIMEOUT - tunnel_entry[IDX_TT_EXPIRES] = expires - RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG) + with Transport.tunnels_lock: + tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id] + paths = tunnel_entry[IDX_TT_PATHS] + paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash] + expires = time.time() + Transport.DESTINATION_TIMEOUT + tunnel_entry[IDX_TT_EXPIRES] = expires + RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG) # Call externally registered callbacks from apps # wanting to know when an announce arrives - for handler in Transport.announce_handlers: - try: - # Check that the announced destination matches - # the handlers aspect filter - execute_callback = False - announce_identity = RNS.Identity.recall(packet.destination_hash) - if handler.aspect_filter == None: - # If the handlers aspect filter is set to - # None, we execute the callback in all cases - execute_callback = True - else: - handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity) - if packet.destination_hash == handler_expected_hash: execute_callback = True - - # If this is a path response, check whether the - # handler wants to receive it. - if packet.context == RNS.Packet.PATH_RESPONSE: - if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True: pass - else: execute_callback = False - - if execute_callback: - if len(inspect.signature(handler.received_announce).parameters) == 3: - def job(): - handler.received_announce(destination_hash=packet.destination_hash, - announced_identity=announce_identity, - app_data=RNS.Identity.recall_app_data(packet.destination_hash)) - threading.Thread(target=job, daemon=True).start() - - elif len(inspect.signature(handler.received_announce).parameters) == 4: - def job(): - handler.received_announce(destination_hash=packet.destination_hash, - announced_identity=announce_identity, - app_data=RNS.Identity.recall_app_data(packet.destination_hash), - announce_packet_hash = packet.packet_hash) - threading.Thread(target=job, daemon=True).start() - - elif len(inspect.signature(handler.received_announce).parameters) == 5: - def job(): - handler.received_announce(destination_hash=packet.destination_hash, - announced_identity=announce_identity, - app_data=RNS.Identity.recall_app_data(packet.destination_hash), - announce_packet_hash = packet.packet_hash, - is_path_response = packet.context == RNS.Packet.PATH_RESPONSE) - threading.Thread(target=job, daemon=True).start() - + with Transport.announce_handler_lock: + for handler in Transport.announce_handlers: + try: + # Check that the announced destination matches + # the handlers aspect filter + execute_callback = False + announce_identity = RNS.Identity.recall(packet.destination_hash) + if handler.aspect_filter == None: + # If the handlers aspect filter is set to + # None, we execute the callback in all cases + execute_callback = True else: - raise TypeError("Invalid signature for announce handler callback") + handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity) + if packet.destination_hash == handler_expected_hash: execute_callback = True - except Exception as e: - RNS.log("Error while processing external announce callback.", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - RNS.trace_exception(e) + # If this is a path response, check whether the + # handler wants to receive it. + if packet.context == RNS.Packet.PATH_RESPONSE: + if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True: pass + else: execute_callback = False + + if execute_callback: + if len(inspect.signature(handler.received_announce).parameters) == 3: + def job(): + handler.received_announce(destination_hash=packet.destination_hash, + announced_identity=announce_identity, + app_data=RNS.Identity.recall_app_data(packet.destination_hash)) + threading.Thread(target=job, daemon=True).start() + + elif len(inspect.signature(handler.received_announce).parameters) == 4: + def job(): + handler.received_announce(destination_hash=packet.destination_hash, + announced_identity=announce_identity, + app_data=RNS.Identity.recall_app_data(packet.destination_hash), + announce_packet_hash = packet.packet_hash) + threading.Thread(target=job, daemon=True).start() + + elif len(inspect.signature(handler.received_announce).parameters) == 5: + def job(): + handler.received_announce(destination_hash=packet.destination_hash, + announced_identity=announce_identity, + app_data=RNS.Identity.recall_app_data(packet.destination_hash), + announce_packet_hash = packet.packet_hash, + is_path_response = packet.context == RNS.Packet.PATH_RESPONSE) + threading.Thread(target=job, daemon=True).start() + + else: + raise TypeError("Invalid signature for announce handler callback") + + except Exception as e: + RNS.log("Error while processing external announce callback.", RNS.LOG_ERROR) + RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.trace_exception(e) # Handling for link requests to local destinations elif packet.packet_type == RNS.Packet.LINKREQUEST: @@ -1992,10 +1972,9 @@ class Transport: cached_packet.unpack() RNS.Packet(destination=link, data=cached_packet.data, packet_type=cached_packet.packet_type, context=cached_packet.context).send() - - Transport.jobs_locked = False - else: - link.receive(packet) + + else: link.receive(packet) + else: # In the strange and rare case that an interface # is partly malfunctioning, and a link-associated @@ -2011,14 +1990,12 @@ class Transport: if destination.hash == packet.destination_hash and destination.type == packet.destination_type: packet.destination = destination if destination.receive(packet): - if destination.proof_strategy == RNS.Destination.PROVE_ALL: - packet.prove() + if destination.proof_strategy == RNS.Destination.PROVE_ALL: packet.prove() elif destination.proof_strategy == RNS.Destination.PROVE_APP: if destination.callbacks.proof_requested: try: - if destination.callbacks.proof_requested(packet): - packet.prove() + if destination.callbacks.proof_requested(packet): packet.prove() except Exception as e: RNS.log("Error while executing proof request callback. The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -2128,8 +2105,6 @@ class Transport: if receipt in Transport.receipts: Transport.receipts.remove(receipt) - Transport.jobs_locked = False - @staticmethod def synthesize_tunnel(interface): interface_hash = interface.get_hash() @@ -2234,11 +2209,12 @@ class Transport: def register_destination(destination): destination.MTU = RNS.Reticulum.MTU if destination.direction == RNS.Destination.IN: - for registered_destination in Transport.destinations: - if destination.hash == registered_destination.hash: - raise KeyError("Attempt to register an already registered destination.") - - Transport.destinations.append(destination) + with Transport.destinations_lock: + for registered_destination in Transport.destinations: + if destination.hash == registered_destination.hash: + raise KeyError("Attempt to register an already registered destination.") + + Transport.destinations.append(destination) if Transport.owner.is_connected_to_shared_instance: if destination.type == RNS.Destination.SINGLE: @@ -2249,28 +2225,28 @@ class Transport: @staticmethod def deregister_destination(destination): - if destination in Transport.destinations: - Transport.destinations.remove(destination) + with Transport.destinations_lock: + if destination in Transport.destinations: Transport.destinations.remove(destination) @staticmethod def register_link(link): RNS.log("Registering link "+str(link), RNS.LOG_EXTREME) if link.initiator: - Transport.pending_links.append(link) + with Transport.pending_links_lock: Transport.pending_links.append(link) else: - Transport.active_links.append(link) + with Transport.active_links_lock: Transport.active_links.append(link) @staticmethod def activate_link(link): RNS.log("Activating link "+str(link), RNS.LOG_EXTREME) - if link in Transport.pending_links: - if link.status != RNS.Link.ACTIVE: - raise IOError("Invalid link state for link activation: "+str(link.status)) - Transport.pending_links.remove(link) - Transport.active_links.append(link) - link.status = RNS.Link.ACTIVE - else: - RNS.log("Attempted to activate a link that was not in the pending table", RNS.LOG_ERROR) + with Transport.pending_links_lock: + if link in Transport.pending_links: + if link.status != RNS.Link.ACTIVE: raise IOError("Invalid link state for link activation: "+str(link.status)) + Transport.pending_links.remove(link) + with Transport.active_links_lock: Transport.active_links.append(link) + link.status = RNS.Link.ACTIVE + else: + RNS.log("Attempted to activate a link that was not in the pending table", RNS.LOG_ERROR) @staticmethod def register_announce_handler(handler): @@ -2283,9 +2259,9 @@ class Transport: optionally have a *receive_path_responses* attribute set to ``True``, to also receive all path responses, in addition to live announces. See the :ref:`Announce Example` for more info. """ - if hasattr(handler, "received_announce") and callable(handler.received_announce): - if hasattr(handler, "aspect_filter"): - Transport.announce_handlers.append(handler) + with Transport.announce_handler_lock: + if hasattr(handler, "received_announce") and callable(handler.received_announce): + if hasattr(handler, "aspect_filter"): Transport.announce_handlers.append(handler) @staticmethod def deregister_announce_handler(handler): @@ -2294,7 +2270,9 @@ class Transport: :param handler: The announce handler to be deregistered. """ - while handler in Transport.announce_handlers: Transport.announce_handlers.remove(handler) + with Transport.announce_handler_lock: + while handler in Transport.announce_handlers: Transport.announce_handlers.remove(handler) + gc.collect() @staticmethod @@ -2434,8 +2412,9 @@ class Transport: :param destination_hash: A destination hash as *bytes*. :returns: The number of hops to the specified destination, or ``RNS.Transport.PATHFINDER_M`` if the number of hops is unknown. """ - if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_HOPS] - else: return Transport.PATHFINDER_M + with Transport.path_table_lock: + if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_HOPS] + else: return Transport.PATHFINDER_M @staticmethod def next_hop(destination_hash): @@ -2443,8 +2422,9 @@ class Transport: :param destination_hash: A destination hash as *bytes*. :returns: The destination hash as *bytes* for the next hop to the specified destination, or *None* if the next hop is unknown. """ - if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_NEXT_HOP] - else: return None + with Transport.path_table_lock: + if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_NEXT_HOP] + else: return None @staticmethod def next_hop_interface(destination_hash): @@ -2452,8 +2432,9 @@ class Transport: :param destination_hash: A destination hash as *bytes*. :returns: The interface for the next hop to the specified destination, or *None* if the interface is unknown. """ - if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_RVCD_IF] - else: return None + with Transport.path_table_lock: + if destination_hash in Transport.path_table: return Transport.path_table[destination_hash][IDX_PT_RVCD_IF] + else: return None @staticmethod def next_hop_interface_bitrate(destination_hash): diff --git a/RNS/_version.py b/RNS/_version.py index c72e379..9b102be 100644 --- a/RNS/_version.py +++ b/RNS/_version.py @@ -1 +1 @@ -__version__ = "1.1.4" +__version__ = "1.1.5" diff --git a/tests/link.py b/tests/link.py index 9ba31bc..9f85645 100644 --- a/tests/link.py +++ b/tests/link.py @@ -768,7 +768,7 @@ class TestLink(unittest.TestCase): data = bytearray() for rx in received: - data.extend(rx) + if rx: data.extend(rx) rx_message = data print(f"Received {len(received)} chunks, totalling {len(rx_message)} bytes")