Refactoring work for free-threaded transport I/O. Added ingress control bypass on pending path requests.

This commit is contained in:
Mark Qvist 2026-04-15 18:48:17 +02:00
commit c3f0d98e41
4 changed files with 168 additions and 159 deletions

View file

@ -295,33 +295,26 @@ class Destination:
app_data = returned_app_data
signed_data = self.hash+self.identity.get_public_key()+self.name_hash+random_hash+ratchet
if app_data != None:
signed_data += app_data
if app_data != None: signed_data += app_data
signature = self.identity.sign(signed_data)
announce_data = self.identity.get_public_key()+self.name_hash+random_hash+ratchet+signature
if app_data != None:
announce_data += app_data
if app_data != None: announce_data += app_data
self.path_responses[tag] = [time.time(), announce_data]
if path_response:
announce_context = RNS.Packet.PATH_RESPONSE
else:
announce_context = RNS.Packet.NONE
if path_response: announce_context = RNS.Packet.PATH_RESPONSE
else: announce_context = RNS.Packet.NONE
if ratchet:
context_flag = RNS.Packet.FLAG_SET
else:
context_flag = RNS.Packet.FLAG_UNSET
if ratchet: context_flag = RNS.Packet.FLAG_SET
else: context_flag = RNS.Packet.FLAG_UNSET
announce_packet = RNS.Packet(self, announce_data, RNS.Packet.ANNOUNCE, context = announce_context,
attached_interface = attached_interface, context_flag=context_flag)
if send:
announce_packet.send()
else:
return announce_packet
if send: announce_packet.send()
else: return announce_packet
def accepts_links(self, accepts = None):
"""
@ -330,13 +323,10 @@ class Destination:
:param accepts: If ``True`` or ``False``, this method sets whether the destination accepts incoming link requests. If not provided or ``None``, the method returns whether the destination currently accepts link requests.
:returns: ``True`` or ``False`` depending on whether the destination accepts incoming link requests, if the *accepts* parameter is not provided or ``None``.
"""
if accepts == None:
return self.accept_link_requests
if accepts == None: return self.accept_link_requests
if accepts:
self.accept_link_requests = True
else:
self.accept_link_requests = False
if accepts: self.accept_link_requests = True
else: self.accept_link_requests = False
def set_link_established_callback(self, callback):
"""

View file

@ -55,8 +55,8 @@ class Interface:
# How many samples to use for announce
# frequency calculations
IA_FREQ_SAMPLES = 12
OA_FREQ_SAMPLES = 12
IA_FREQ_SAMPLES = 128
OA_FREQ_SAMPLES = 128
# Maximum amount of ingress limited announces
# to hold at any given time.
@ -66,12 +66,12 @@ class Interface:
# considered to be newly created. Two
# hours by default.
IC_NEW_TIME = 2*60*60
IC_BURST_FREQ_NEW = 3.5
IC_BURST_FREQ = 12
IC_BURST_FREQ_NEW = 6
IC_BURST_FREQ = 35
IC_BURST_HOLD = 1*60
IC_BURST_PENALTY = 5*60
IC_HELD_RELEASE_INTERVAL = 30
IC_DEQUE_MIN_SAMPLE = 8
IC_BURST_PENALTY = 15
IC_HELD_RELEASE_INTERVAL = 2
IC_DEQUE_MIN_SAMPLE = 32
AUTOCONFIGURE_MTU = False
FIXED_MTU = False
@ -123,13 +123,14 @@ class Interface:
if self.ic_burst_active:
if ia_freq < freq_threshold and time.time() > self.ic_burst_activated+self.ic_burst_hold:
self.ic_burst_active = False
self.ic_held_release = time.time() + self.ic_burst_penalty
return True
else:
if ia_freq > freq_threshold:
self.ic_burst_active = True
self.ic_burst_activated = time.time()
self.ic_held_release = time.time() + self.ic_burst_penalty
return True
else: return False
@ -174,7 +175,7 @@ class Interface:
def process_held_announces(self):
try:
if not self.should_ingress_limit() and len(self.held_announces) > 0 and time.time() > self.ic_held_release:
if len(self.held_announces) > 0 and time.time() > self.ic_held_release:
freq_threshold = self.ic_burst_freq_new if self.age() < self.ic_new_time else self.ic_burst_freq
ia_freq = self.incoming_announce_frequency()
if ia_freq < freq_threshold:

View file

@ -722,12 +722,9 @@ class Link:
pass
def link_closed(self):
for resource in self.incoming_resources:
resource.cancel()
for resource in self.outgoing_resources:
resource.cancel()
if self._channel:
self._channel._shutdown()
for resource in self.incoming_resources: resource.cancel()
for resource in self.outgoing_resources: resource.cancel()
if self._channel: self._channel._shutdown()
self.prv = None
self.pub = None
@ -741,8 +738,7 @@ class Link:
self.destination.links.remove(self)
if self.callbacks.link_closed != None:
try:
self.callbacks.link_closed(self)
try: self.callbacks.link_closed(self)
except Exception as e:
RNS.log("Error while executing link closed callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)

View file

@ -360,7 +360,7 @@ class Transport:
if len(tunnel_paths) > 0:
tunnel = [tunnel_id, None, tunnel_paths, expires]
Transport.tunnels[tunnel_id] = tunnel
with Transport.tunnels_lock: Transport.tunnels[tunnel_id] = tunnel
if len(Transport.path_table) == 1: specifier = "entry"
else: specifier = "entries"
@ -908,6 +908,7 @@ class Transport:
except Exception as e:
RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e) # TODO: Remove
for packet in outgoing: packet.send()
@ -1553,12 +1554,17 @@ class Transport:
# potential ingress limiting. Already known
# destinations will have re-announces controlled
# by normal announce rate limiting.
if interface.should_ingress_limit():
if packet.destination_hash in Transport.path_requests or packet.destination_hash in Transport.discovery_path_requests:
# RNS.log(f"Skipping ingress limit check for {RNS.prettyhexrep(packet.destination_hash)} due to waiting path requests", RNS.LOG_DEBUG)
pass
elif interface.should_ingress_limit():
interface.hold_announce(packet)
return
with Transport.destinations_lock:
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
if local_destination == None and RNS.Identity.validate_announce(packet):
if packet.transport_id != None:
received_from = packet.transport_id
@ -1575,14 +1581,15 @@ class Transport:
if announce_entry[IDX_AT_RETRIES] > 0:
if announce_entry[IDX_AT_LCL_RBRD] >= Transport.LOCAL_REBROADCASTS_MAX:
RNS.log("Completed announce processing for "+RNS.prettyhexrep(packet.destination_hash)+", local rebroadcast limit reached", RNS.LOG_EXTREME)
if packet.destination_hash in Transport.announce_table: Transport.announce_table.pop(packet.destination_hash)
with Transport.announce_table_lock:
if packet.destination_hash in Transport.announce_table: Transport.announce_table.pop(packet.destination_hash)
if packet.hops-1 == announce_entry[IDX_AT_HOPS]+1 and announce_entry[IDX_AT_RETRIES] > 0:
now = time.time()
if now < announce_entry[IDX_AT_RTRNS_TMO]:
RNS.log("Rebroadcasted announce for "+RNS.prettyhexrep(packet.destination_hash)+" has been passed on to another node, no further tries needed", RNS.LOG_EXTREME)
if packet.destination_hash in Transport.announce_table:
Transport.announce_table.pop(packet.destination_hash)
with Transport.announce_table_lock:
if packet.destination_hash in Transport.announce_table: Transport.announce_table.pop(packet.destination_hash)
else:
received_from = packet.destination_hash
@ -1706,9 +1713,7 @@ class Transport:
else:
rate_entry["last"] = now
else:
rate_blocked = True
else: rate_blocked = True
retries = 0
announce_hops = packet.hops
@ -1961,29 +1966,31 @@ class Transport:
# Handling for local data packets
elif packet.packet_type == RNS.Packet.DATA:
if packet.destination_type == RNS.Destination.LINK:
for link in Transport.active_links:
if link.link_id == packet.destination_hash:
if link.attached_interface == packet.receiving_interface:
packet.link = link
if packet.context == RNS.Packet.CACHE_REQUEST:
cached_packet = Transport.get_cached_packet(packet.data)
if cached_packet != None:
cached_packet.unpack()
RNS.Packet(destination=link, data=cached_packet.data,
packet_type=cached_packet.packet_type, context=cached_packet.context).send()
with Transport.active_links_lock:
for link in Transport.active_links:
if link.link_id == packet.destination_hash:
if link.attached_interface == packet.receiving_interface:
packet.link = link
if packet.context == RNS.Packet.CACHE_REQUEST:
cached_packet = Transport.get_cached_packet(packet.data)
if cached_packet != None:
cached_packet.unpack()
RNS.Packet(destination=link, data=cached_packet.data,
packet_type=cached_packet.packet_type, context=cached_packet.context).send()
else: link.receive(packet)
break
else: link.receive(packet)
else:
# In the strange and rare case that an interface
# is partly malfunctioning, and a link-associated
# packet is being received on an interface that
# has failed sending, and transport has failed over
# to another path, we remove this packet hash from
# the filter hashlist so the link can receive the
# packet when it finally arrives over another path.
while packet.packet_hash in Transport.packet_hashlist:
Transport.packet_hashlist.remove(packet.packet_hash)
else:
# In the strange and rare case that an interface
# is partly malfunctioning, and a link-associated
# packet is being received on an interface that
# has failed sending, and transport has failed over
# to another path, we remove this packet hash from
# the filter hashlist so the link can receive the
# packet when it finally arrives over another path.
while packet.packet_hash in Transport.packet_hashlist:
Transport.packet_hashlist.remove(packet.packet_hash)
else:
for destination in Transport.destinations:
if destination.hash == packet.destination_hash and destination.type == packet.destination_type:
@ -1997,6 +2004,7 @@ class Transport:
if destination.callbacks.proof_requested(packet): packet.prove()
except Exception as e:
RNS.log("Error while executing proof request callback. The contained exception was: "+str(e), RNS.LOG_ERROR)
break
# Handling for proofs and link-request proofs
elif packet.packet_type == RNS.Packet.PROOF:
@ -2038,48 +2046,57 @@ class Transport:
RNS.log("Link request proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG)
else:
RNS.log("Received link request proof with hop mismatch, not transporting it", RNS.LOG_DEBUG)
else:
# Check if we can deliver it to a local
# pending link
for link in Transport.pending_links:
if link.link_id == packet.destination_hash:
# We need to also allow an expected hops value of
# PATHFINDER_M, since in some cases, the number of hops
# to the destination will be unknown at link creation
# time. The real chance of this occuring is likely to be
# extremely small, and this allowance could probably
# be discarded without major issues, but it is kept
# for now to ensure backwards compatibility.
# TODO: Probably reset check back to
# if packet.hops == link.expected_hops:
# within one of the next releases
pending_link = None
with Transport.pending_links_lock:
for link in Transport.pending_links:
if link.link_id == packet.destination_hash:
# We need to also allow an expected hops value of
# PATHFINDER_M, since in some cases, the number of hops
# to the destination will be unknown at link creation
# time. The real chance of this occuring is likely to be
# extremely small, and this allowance could probably
# be discarded without major issues, but it is kept
# for now to ensure backwards compatibility.
if packet.hops == link.expected_hops or link.expected_hops == RNS.Transport.PATHFINDER_M:
# Add this packet to the filter hashlist if we
# have determined that it's actually destined
# for this system, and then validate the proof
Transport.add_packet_hash(packet.packet_hash)
link.validate_proof(packet)
# TODO: Probably reset check back to
# if packet.hops == link.expected_hops:
# within one of the next releases
if packet.hops == link.expected_hops or link.expected_hops == RNS.Transport.PATHFINDER_M:
# Add this packet to the filter hashlist if we
# have determined that it's actually destined
# for this system, and then validate the proof
Transport.add_packet_hash(packet.packet_hash)
pending_link = link
break
if pending_link: pending_link.validate_proof(packet)
elif packet.context == RNS.Packet.RESOURCE_PRF:
for link in Transport.active_links:
if link.link_id == packet.destination_hash:
link.receive(packet)
else:
if packet.destination_type == RNS.Destination.LINK:
with Transport.active_links_lock:
for link in Transport.active_links:
if link.link_id == packet.destination_hash:
packet.link = link
link.receive(packet)
break
else:
if packet.destination_type == RNS.Destination.LINK:
with Transport.active_links_lock:
for link in Transport.active_links:
if link.link_id == packet.destination_hash:
packet.link = link
break
if len(packet.data) == RNS.PacketReceipt.EXPL_LENGTH:
proof_hash = packet.data[:RNS.Identity.HASHLENGTH//8]
else:
proof_hash = None
if len(packet.data) == RNS.PacketReceipt.EXPL_LENGTH: proof_hash = packet.data[:RNS.Identity.HASHLENGTH//8]
else: proof_hash = None
# Check if this proof needs to be transported
if (RNS.Reticulum.transport_enabled() or from_local_client or proof_for_local_client) and packet.destination_hash in Transport.reverse_table:
reverse_entry = Transport.reverse_table.pop(packet.destination_hash)
with Transport.reverse_table_lock: reverse_entry = Transport.reverse_table.pop(packet.destination_hash)
if packet.receiving_interface == reverse_entry[IDX_RT_OUTB_IF]:
RNS.log("Proof received on correct interface, transporting it via "+str(reverse_entry[IDX_RT_RCVD_IF]), RNS.LOG_EXTREME)
new_raw = packet.raw[0:1]
@ -2089,20 +2106,21 @@ class Transport:
else:
RNS.log("Proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG)
for receipt in Transport.receipts:
receipt_validated = False
if proof_hash != None:
# Only test validation if hash matches
if receipt.hash == proof_hash:
with Transport.receipts_lock:
for receipt in Transport.receipts:
receipt_validated = False
if proof_hash != None:
# Only test validation if hash matches
if receipt.hash == proof_hash:
receipt_validated = receipt.validate_proof_packet(packet)
else:
# In case of an implicit proof, we have
# to check every single outstanding receipt
receipt_validated = receipt.validate_proof_packet(packet)
else:
# In case of an implicit proof, we have
# to check every single outstanding receipt
receipt_validated = receipt.validate_proof_packet(packet)
if receipt_validated:
if receipt in Transport.receipts:
Transport.receipts.remove(receipt)
if receipt_validated:
if receipt in Transport.receipts:
Transport.receipts.remove(receipt)
@staticmethod
def synthesize_tunnel(interface):
@ -2151,9 +2169,10 @@ class Transport:
@staticmethod
def void_tunnel_interface(tunnel_id):
if tunnel_id in Transport.tunnels:
RNS.log(f"Voiding tunnel interface {Transport.tunnels[tunnel_id][IDX_TT_IF]}", RNS.LOG_EXTREME)
Transport.tunnels[tunnel_id][IDX_TT_IF] = None
with Transport.tunnels_lock:
if tunnel_id in Transport.tunnels:
RNS.log(f"Voiding tunnel interface {Transport.tunnels[tunnel_id][IDX_TT_IF]}", RNS.LOG_EXTREME)
Transport.tunnels[tunnel_id][IDX_TT_IF] = None
@staticmethod
def handle_tunnel(tunnel_id, interface):
@ -2161,9 +2180,10 @@ class Transport:
if not tunnel_id in Transport.tunnels:
RNS.log("Tunnel endpoint "+RNS.prettyhexrep(tunnel_id)+" established.", RNS.LOG_DEBUG)
paths = {}
tunnel_entry = [tunnel_id, interface, paths, expires]
interface.tunnel_id = tunnel_id
Transport.tunnels[tunnel_id] = tunnel_entry
with Transport.tunnels_lock:
tunnel_entry = [tunnel_id, interface, paths, expires]
interface.tunnel_id = tunnel_id
Transport.tunnels[tunnel_id] = tunnel_entry
else:
RNS.log("Tunnel endpoint "+RNS.prettyhexrep(tunnel_id)+" reappeared. Restoring paths...", RNS.LOG_DEBUG)
tunnel_entry = Transport.tunnels[tunnel_id]
@ -2173,36 +2193,38 @@ class Transport:
paths = tunnel_entry[IDX_TT_PATHS]
deprecated_paths = []
for destination_hash, path_entry in paths.items():
received_from = path_entry[1]
announce_hops = path_entry[2]
expires = path_entry[3]
random_blobs = list(set(path_entry[4]))
receiving_interface = interface
packet_hash = path_entry[6]
new_entry = [time.time(), received_from, announce_hops, expires, random_blobs, receiving_interface, packet_hash]
with Transport.tunnels_lock:
for destination_hash, path_entry in paths.items():
received_from = path_entry[1]
announce_hops = path_entry[2]
expires = path_entry[3]
random_blobs = list(set(path_entry[4]))
receiving_interface = interface
packet_hash = path_entry[6]
new_entry = [time.time(), received_from, announce_hops, expires, random_blobs, receiving_interface, packet_hash]
should_add = False
if destination_hash in Transport.path_table:
old_entry = Transport.path_table[destination_hash]
old_hops = old_entry[IDX_PT_HOPS]
old_expires = old_entry[IDX_PT_EXPIRES]
if announce_hops <= old_hops or time.time() > old_expires: should_add = True
else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because a newer path with fewer hops exist", RNS.LOG_DEBUG)
else:
if time.time() < expires: should_add = True
else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because it has expired", RNS.LOG_DEBUG)
should_add = False
with Transport.path_table_lock:
if destination_hash in Transport.path_table:
old_entry = Transport.path_table[destination_hash]
old_hops = old_entry[IDX_PT_HOPS]
old_expires = old_entry[IDX_PT_EXPIRES]
if announce_hops <= old_hops or time.time() > old_expires: should_add = True
else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because a newer path with fewer hops exist", RNS.LOG_DEBUG)
else:
if time.time() < expires: should_add = True
else: RNS.log("Did not restore path to "+RNS.prettyhexrep(destination_hash)+" because it has expired", RNS.LOG_DEBUG)
if should_add:
Transport.path_table[destination_hash] = new_entry
RNS.log("Restored path to "+RNS.prettyhexrep(destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(receiving_interface), RNS.LOG_DEBUG)
else:
deprecated_paths.append(destination_hash)
if should_add:
with Transport.path_table_lock: Transport.path_table[destination_hash] = new_entry
RNS.log("Restored path to "+RNS.prettyhexrep(destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(receiving_interface), RNS.LOG_DEBUG)
else: deprecated_paths.append(destination_hash)
for deprecated_path in deprecated_paths:
RNS.log("Removing path to "+RNS.prettyhexrep(deprecated_path)+" from tunnel "+RNS.prettyhexrep(tunnel_id), RNS.LOG_DEBUG)
paths.pop(deprecated_path)
with Transport.tunnels_lock: paths.pop(deprecated_path)
@staticmethod
def register_destination(destination):
@ -2302,8 +2324,8 @@ class Transport:
def clean_announce_cache():
st = time.time()
target_path = os.path.join(RNS.Reticulum.cachepath, "announces")
active_paths = [Transport.path_table[dst_hash][6] for dst_hash in Transport.path_table]
tunnel_paths = list(set([path_dict[dst_hash][6] for path_dict in [Transport.tunnels[tunnel_id][2] for tunnel_id in Transport.tunnels] for dst_hash in path_dict]))
with Transport.path_table_lock: active_paths = [Transport.path_table[dst_hash][6] for dst_hash in Transport.path_table]
with Transport.tunnels_lock: tunnel_paths = list(set([path_dict[dst_hash][6] for path_dict in [Transport.tunnels[tunnel_id][2] for tunnel_id in Transport.tunnels] for dst_hash in path_dict]))
removed = 0
for packet_hash in os.listdir(target_path):
remove = False
@ -2314,8 +2336,7 @@ class Transport:
if (not target_hash in active_paths) and (not target_hash in tunnel_paths): remove = True
if remove: os.unlink(full_path); removed += 1
if removed > 0:
RNS.log(f"Removed {removed} cached announces in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG)
if removed > 0: RNS.log(f"Removed {removed} cached announces in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG)
# When caching packets to storage, they are written
# exactly as they arrived over their interface. This
@ -2447,8 +2468,8 @@ class Transport:
if next_hop_interface != None:
if next_hop_interface.AUTOCONFIGURE_MTU or next_hop_interface.FIXED_MTU: return next_hop_interface.HW_MTU
else: return None
else:
return None
else: return None
@staticmethod
def next_hop_per_bit_latency(destination_hash):
@ -2511,7 +2532,8 @@ class Transport:
def path_is_unresponsive(destination_hash):
with Transport.path_states_lock:
if destination_hash in Transport.path_states:
if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE: return True
if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE:
return True
return False
@ -2757,7 +2779,8 @@ class Transport:
held_entry = Transport.announce_table[packet.destination_hash]
Transport.held_announces[packet.destination_hash] = held_entry
Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface]
with Transport.announce_table_lock:
Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, announce_hops, packet, local_rebroadcasts, block_rebroadcasts, attached_interface]
elif is_from_local_client:
# Forward path request on all interfaces
@ -2776,7 +2799,7 @@ class Transport:
# except the requestor interface
RNS.log("Attempting to discover unknown path to "+RNS.prettyhexrep(destination_hash)+" on behalf of path request"+interface_str, RNS.LOG_DEBUG)
pr_entry = { "destination_hash": destination_hash, "timeout": time.time()+Transport.PATH_REQUEST_TIMEOUT, "requesting_interface": attached_interface }
Transport.discovery_path_requests[destination_hash] = pr_entry
with Transport.discovery_pr_lock: Transport.discovery_path_requests[destination_hash] = pr_entry
for interface in Transport.interfaces:
if not interface == attached_interface:
@ -2876,11 +2899,11 @@ class Transport:
@staticmethod
def shared_connection_disappeared():
for link in Transport.active_links:
link.teardown()
with Transport.active_links_lock:
for link in Transport.active_links: link.teardown()
for link in Transport.pending_links:
link.teardown()
with Transport.pending_links_lock:
for link in Transport.pending_links: link.teardown()
Transport.announce_table = {}
Transport.path_table = {}
@ -2892,11 +2915,10 @@ class Transport:
@staticmethod
def shared_connection_reappeared():
if Transport.owner.is_connected_to_shared_instance:
for registered_destination in Transport.destinations:
for registered_destination in Transport.destinations.copy():
if registered_destination.type == RNS.Destination.SINGLE:
registered_destination.announce(path_response=True)
@staticmethod
def drop_announce_queues():
for interface in Transport.interfaces: