Added local destinations lookup map

This commit is contained in:
Mark Qvist 2026-04-17 11:39:14 +02:00
commit 8093c3cd2c
4 changed files with 109 additions and 51 deletions

View file

@ -411,7 +411,9 @@ class Destination:
else:
if packet.packet_type == RNS.Packet.DATA:
if self.callbacks.packet != None:
try: self.callbacks.packet(plaintext, packet)
try:
def job(): self.callbacks.packet(plaintext, packet)
threading.Thread(target=job, daemon=True).start()
except Exception as e:
RNS.log("Error while executing receive callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)

View file

@ -96,6 +96,7 @@ class Transport:
interfaces = [] # All active interfaces
destinations = [] # All active destinations
destinations_map = {} # Destination hash map of active destinations
pending_links = [] # Links that are being established
active_links = [] # Links that are active
packet_hashlist = set() # A list of packet hashes for duplicate detection
@ -121,7 +122,9 @@ class Transport:
discovery_pr_tags = [] # A table for keeping track of tagged path requests
max_pr_tags = 32000 # Maximum amount of unique path request tags to remember
interfaces_lock = Lock()
destinations_lock = Lock()
destinations_map_lock = Lock()
inbound_announce_lock = Lock()
announce_table_lock = Lock()
announce_rate_table_lock = Lock()
@ -305,7 +308,8 @@ class Transport:
# over an interface. It is cached with it's non-
# increased hop-count.
announce_packet.hops += 1
Transport.path_table[destination_hash] = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet.packet_hash]
with Transport.path_table_lock:
Transport.path_table[destination_hash] = [timestamp, received_from, hops, expires, random_blobs, receiving_interface, announce_packet.packet_hash]
RNS.log("Loaded path table entry for "+RNS.prettyhexrep(destination_hash)+" from storage", RNS.LOG_DEBUG)
else:
RNS.log("Could not reconstruct path table entry from storage for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
@ -410,8 +414,9 @@ class Transport:
@staticmethod
def prioritize_interfaces():
try: Transport.interfaces.sort(key=lambda interface: interface.bitrate, reverse=True)
except Exception as e: RNS.log(f"Could not prioritize interfaces according to bitrate. The contained exception was: {e}", RNS.LOG_ERROR)
with Transport.interfaces_lock:
try: Transport.interfaces.sort(key=lambda interface: interface.bitrate, reverse=True)
except Exception as e: RNS.log(f"Could not prioritize interfaces according to bitrate. The contained exception was: {e}", RNS.LOG_ERROR)
@staticmethod
def enable_discovery():
@ -610,10 +615,14 @@ class Transport:
# Cull invalidated path requests
if time.time() > Transport.pending_prs_last_checked+Transport.pending_prs_check_interval:
stale_local_prs = []
with Transport.pending_local_prs_lock:
for destination_hash in Transport.pending_local_path_requests:
if not Transport.pending_local_path_requests[destination_hash] in Transport.interfaces:
Transport.pending_local_path_requests.pop(destination_hash)
stale_local_prs.append(destination_hash)
for destination_hash in stale_local_prs:
Transport.pending_local_path_requests.pop(destination_hash)
Transport.pending_prs_last_checked = time.time()
@ -1091,8 +1100,10 @@ class Transport:
should_transmit = False
elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
with Transport.destinations_lock:
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
local_destination = None
with Transport.destinations_map_lock:
if packet.destination_hash in Transport.destinations_map:
local_destination = Transport.destinations_map[packet.destination_hash]
if local_destination != None:
# RNS.log("Allowing announce broadcast on roaming-mode interface from instance-local destination", RNS.LOG_EXTREME)
@ -1114,8 +1125,11 @@ class Transport:
should_transmit = False
elif interface.mode == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
with Transport.destinations_lock:
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
local_destination = None
with Transport.destinations_map_lock:
if packet.destination_hash in Transport.destinations_map:
local_destination = Transport.destinations_map[packet.destination_hash]
if local_destination != None:
# RNS.log("Allowing announce broadcast on boundary-mode interface from instance-local destination", RNS.LOG_EXTREME)
pass
@ -1584,8 +1598,10 @@ class Transport:
interface.hold_announce(packet)
return
with Transport.destinations_lock:
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
local_destination = None
with Transport.destinations_map_lock:
if packet.destination_hash in Transport.destinations_map:
local_destination = Transport.destinations_map[packet.destination_hash]
if local_destination == None and RNS.Identity.validate_announce(packet):
if packet.transport_id != None:
@ -1622,7 +1638,9 @@ class Transport:
# First, check that the announce is not for a destination
# local to this system, and that hops are less than the max
with Transport.destinations_lock: local_and_hops_condition = (not any(packet.destination_hash == d.hash for d in Transport.destinations) and packet.hops < Transport.PATHFINDER_M+1)
with Transport.destinations_map_lock:
local_and_hops_condition = (packet.hops < Transport.PATHFINDER_M+1) and (not packet.destination_hash in Transport.destinations_map)
if local_and_hops_condition:
announce_emitted = Transport.announce_emitted(packet)
@ -1960,31 +1978,35 @@ class Transport:
# Handling for link requests to local destinations
elif packet.packet_type == RNS.Packet.LINKREQUEST:
if packet.transport_id == None or packet.transport_id == Transport.identity.hash:
for destination in Transport.destinations:
if destination.hash == packet.destination_hash and destination.type == packet.destination_type:
path_mtu = RNS.Link.mtu_from_lr_packet(packet)
mode = RNS.Link.mode_from_lr_packet(packet)
if packet.receiving_interface.AUTOCONFIGURE_MTU or packet.receiving_interface.FIXED_MTU:
nh_mtu = packet.receiving_interface.HW_MTU
destination = None
with Transport.destinations_map_lock:
if packet.destination_hash in Transport.destinations_map:
destination = Transport.destinations_map[packet.destination_hash]
if destination and destination.type == packet.destination_type:
path_mtu = RNS.Link.mtu_from_lr_packet(packet)
mode = RNS.Link.mode_from_lr_packet(packet)
if packet.receiving_interface.AUTOCONFIGURE_MTU or packet.receiving_interface.FIXED_MTU:
nh_mtu = packet.receiving_interface.HW_MTU
else:
nh_mtu = RNS.Reticulum.MTU
if path_mtu:
if packet.receiving_interface.HW_MTU == None:
path_mtu = None
packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE]
else:
nh_mtu = RNS.Reticulum.MTU
if nh_mtu < path_mtu:
try:
path_mtu = nh_mtu
clamped_mtu = RNS.Link.signalling_bytes(path_mtu, mode)
packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE]+clamped_mtu
except Exception as e:
RNS.log(f"Dropping link request packet to local destination. The contained exception was: {e}", RNS.LOG_WARNING)
return
if path_mtu:
if packet.receiving_interface.HW_MTU == None:
path_mtu = None
packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE]
else:
if nh_mtu < path_mtu:
try:
path_mtu = nh_mtu
clamped_mtu = RNS.Link.signalling_bytes(path_mtu, mode)
packet.data = packet.data[:-RNS.Link.LINK_MTU_SIZE]+clamped_mtu
except Exception as e:
RNS.log(f"Dropping link request packet to local destination. The contained exception was: {e}", RNS.LOG_WARNING)
return
packet.destination = destination
destination.receive(packet)
packet.destination = destination
destination.receive(packet)
# Handling for local data packets
elif packet.packet_type == RNS.Packet.DATA:
@ -2015,19 +2037,22 @@ class Transport:
while packet.packet_hash in Transport.packet_hashlist:
Transport.packet_hashlist.remove(packet.packet_hash)
else:
for destination in Transport.destinations:
if destination.hash == packet.destination_hash and destination.type == packet.destination_type:
packet.destination = destination
if destination.receive(packet):
if destination.proof_strategy == RNS.Destination.PROVE_ALL: packet.prove()
destination = None
with Transport.destinations_map_lock:
if packet.destination_hash in Transport.destinations_map:
destination = Transport.destinations_map[packet.destination_hash]
elif destination.proof_strategy == RNS.Destination.PROVE_APP:
if destination.callbacks.proof_requested:
try:
if destination.callbacks.proof_requested(packet): packet.prove()
except Exception as e:
RNS.log("Error while executing proof request callback. The contained exception was: "+str(e), RNS.LOG_ERROR)
break
if destination and destination.type == packet.destination_type:
packet.destination = destination
if destination.receive(packet):
if destination.proof_strategy == RNS.Destination.PROVE_ALL: packet.prove()
elif destination.proof_strategy == RNS.Destination.PROVE_APP:
if destination.callbacks.proof_requested:
try:
if destination.callbacks.proof_requested(packet): packet.prove()
except Exception as e:
RNS.log("Error while executing proof request callback. The contained exception was: "+str(e), RNS.LOG_ERROR)
# Handling for proofs and link-request proofs
elif packet.packet_type == RNS.Packet.PROOF:
@ -2254,6 +2279,27 @@ class Transport:
RNS.log("Removing path to "+RNS.prettyhexrep(deprecated_path)+" from tunnel "+RNS.prettyhexrep(tunnel_id), RNS.LOG_DEBUG)
with Transport.tunnels_lock: paths.pop(deprecated_path)
@staticmethod
def clean_destinations_map():
with Transport.destinations_lock:
for destination in Transport.destinations:
with Transport.destinations_map_lock:
if not destination.hash in Transport.destinations_map:
Transport.destinations_map[destination.hash] = destination
with Transport.destinations_map_lock:
stale_destination_hashes = []
for destination_hash in Transport.destinations_map:
with Transport.destinations_lock:
found = False
for destination in Transport.destinations:
if destination.hash == destination_hash: found = True
if not found: stale_destination_hashes.append(destination_hash)
for destination_hash in stale_destination_hashes:
Transport.destinations_map.pop(destination_hash)
@staticmethod
def register_destination(destination):
destination.MTU = RNS.Reticulum.MTU
@ -2265,6 +2311,9 @@ class Transport:
Transport.destinations.append(destination)
with Transport.destinations_map_lock:
Transport.destinations_map[destination.hash] = destination
if Transport.owner.is_connected_to_shared_instance:
if destination.type == RNS.Destination.SINGLE:
def job():
@ -2277,6 +2326,10 @@ class Transport:
with Transport.destinations_lock:
if destination in Transport.destinations: Transport.destinations.remove(destination)
with Transport.destinations_map_lock:
if destination.hash in Transport.destinations_map:
Transport.destinations_map.pop(destination.hash)
@staticmethod
def register_link(link):
RNS.log("Registering link "+str(link), RNS.LOG_EXTREME)
@ -2761,8 +2814,11 @@ class Transport:
destination_exists_on_local_client = True
with Transport.pending_local_prs_lock:
Transport.pending_local_path_requests[destination_hash] = attached_interface
local_destination = next((d for d in Transport.destinations if d.hash == destination_hash), None)
local_destination = None
with Transport.destinations_map_lock:
if destination_hash in Transport.destinations_map: local_destination = Transport.destinations_map[destination_hash]
if local_destination != None:
local_destination.announce(path_response=True, tag=tag, attached_interface=attached_interface)
RNS.log("Answering path request for "+RNS.prettyhexrep(destination_hash)+interface_str+", destination is local to this system", RNS.LOG_DEBUG)