Refactored transport jobs for free-threaded implementation

This commit is contained in:
Mark Qvist 2026-04-12 13:33:15 +02:00
commit fa353fb0b3

View file

@ -124,8 +124,14 @@ class Transport:
path_table_lock = Lock()
reverse_table_lock = Lock()
link_table_lock = Lock()
active_links_lock = Lock()
pending_links_lock = Lock()
tunnels_lock = Lock()
receipts_lock = Lock()
discovery_pr_lock = Lock()
discovery_pr_tags_lock = Lock()
path_requests_lock = Lock()
pending_local_prs_lock = Lock()
path_states_lock = Lock()
jobs_lock = Lock()
@ -470,118 +476,128 @@ class Transport:
Transport.jobs_running = True
try:
if not Transport.jobs_locked:
with Transport.jobs_lock:
should_collect = False
# Process active and pending link lists
if time.time() > Transport.links_last_checked+Transport.links_check_interval:
for link in Transport.pending_links:
if link.status == RNS.Link.CLOSED:
# If we are not a Transport Instance, finding a pending link
# that was never activated will trigger an expiry of the path
# to the destination, and an attempt to rediscover the path.
if not RNS.Reticulum.transport_enabled():
Transport.expire_path(link.destination.hash)
with Transport.pending_links_lock:
for link in Transport.pending_links:
if link.status == RNS.Link.CLOSED:
# If we are not a Transport Instance, finding a pending link
# that was never activated will trigger an expiry of the path
# to the destination, and an attempt to rediscover the path.
if not RNS.Reticulum.transport_enabled():
Transport.expire_path(link.destination.hash)
# If we are connected to a shared instance, it will take
# care of sending out a new path request. If not, we will
# send one directly.
if not Transport.owner.is_connected_to_shared_instance:
last_path_request = 0
if link.destination.hash in Transport.path_requests:
last_path_request = Transport.path_requests[link.destination.hash]
# If we are connected to a shared instance, it will take
# care of sending out a new path request. If not, we will
# send one directly.
if not Transport.owner.is_connected_to_shared_instance:
last_path_request = 0
with Transport.path_requests_lock:
if link.destination.hash in Transport.path_requests:
last_path_request = Transport.path_requests[link.destination.hash]
if time.time() - last_path_request > Transport.PATH_REQUEST_MI:
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link.destination.hash)+" since an attempted link was never established", RNS.LOG_DEBUG)
if not link.destination.hash in path_requests:
blocked_if = None
path_requests[link.destination.hash] = blocked_if
if time.time() - last_path_request > Transport.PATH_REQUEST_MI:
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link.destination.hash)+" since an attempted link was never established", RNS.LOG_DEBUG)
if not link.destination.hash in path_requests:
blocked_if = None
path_requests[link.destination.hash] = blocked_if
Transport.pending_links.remove(link)
Transport.pending_links.remove(link)
for link in Transport.active_links:
if link.status == RNS.Link.CLOSED:
Transport.active_links.remove(link)
with Transport.active_links_lock:
closed_links = []
for link in Transport.active_links:
if link.status == RNS.Link.CLOSED: closed_links.append(link)
for closed_link in closed_links: Transport.active_links.remove(closed_link)
Transport.links_last_checked = time.time()
# Process receipts list for timed-out packets
if time.time() > Transport.receipts_last_checked+Transport.receipts_check_interval:
while len(Transport.receipts) > Transport.MAX_RECEIPTS:
culled_receipt = Transport.receipts.pop(0)
culled_receipt.timeout = -1
culled_receipt.check_timeout()
should_collect = True
with Transport.receipts_lock:
while len(Transport.receipts) > Transport.MAX_RECEIPTS:
culled_receipt = Transport.receipts.pop(0)
culled_receipt.timeout = -1
culled_receipt.check_timeout()
should_collect = True
for receipt in Transport.receipts:
receipt.check_timeout()
if receipt.status != RNS.PacketReceipt.SENT:
if receipt in Transport.receipts:
Transport.receipts.remove(receipt)
with Transport.receipts_lock:
expired_receipts = []
for receipt in Transport.receipts:
receipt.check_timeout()
if receipt.status != RNS.PacketReceipt.SENT: expired_receipts.append(receipt)
for expired_receipt in expired_receipts:
if expired_receipt in Transport.receipts: Transport.receipts.remove(expired_receipt)
Transport.receipts_last_checked = time.time()
# Process announces needing retransmission
if time.time() > Transport.announces_last_checked+Transport.announces_check_interval:
completed_announces = []
for destination_hash in Transport.announce_table:
announce_entry = Transport.announce_table[destination_hash]
if announce_entry[IDX_AT_RETRIES] > 0 and announce_entry[IDX_AT_RETRIES] >= Transport.LOCAL_REBROADCASTS_MAX:
RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", local rebroadcast limit reached", RNS.LOG_EXTREME)
completed_announces.append(destination_hash)
elif announce_entry[IDX_AT_RETRIES] > Transport.PATHFINDER_R:
RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", retry limit reached", RNS.LOG_EXTREME)
completed_announces.append(destination_hash)
else:
if time.time() > announce_entry[IDX_AT_RTRNS_TMO]:
announce_entry[IDX_AT_RTRNS_TMO] = time.time() + Transport.PATHFINDER_G + Transport.PATHFINDER_RW
announce_entry[IDX_AT_RETRIES] += 1
packet = announce_entry[IDX_AT_PACKET]
block_rebroadcasts = announce_entry[IDX_AT_BLCK_RBRD]
attached_interface = announce_entry[IDX_AT_ATTCHD_IF]
announce_context = RNS.Packet.NONE
if block_rebroadcasts: announce_context = RNS.Packet.PATH_RESPONSE
announce_data = packet.data
announce_identity = RNS.Identity.recall(packet.destination_hash)
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
announce_destination.hash = packet.destination_hash
announce_destination.hexhash = announce_destination.hash.hex()
new_packet = RNS.Packet(
announce_destination,
announce_data,
RNS.Packet.ANNOUNCE,
context = announce_context,
header_type = RNS.Packet.HEADER_2,
transport_type = Transport.TRANSPORT,
transport_id = Transport.identity.hash,
attached_interface = attached_interface,
context_flag = packet.context_flag,
)
with Transport.announce_table_lock:
for destination_hash in Transport.announce_table:
announce_entry = Transport.announce_table[destination_hash]
if announce_entry[IDX_AT_RETRIES] > 0 and announce_entry[IDX_AT_RETRIES] >= Transport.LOCAL_REBROADCASTS_MAX:
RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", local rebroadcast limit reached", RNS.LOG_EXTREME)
completed_announces.append(destination_hash)
elif announce_entry[IDX_AT_RETRIES] > Transport.PATHFINDER_R:
RNS.log("Completed announce processing for "+RNS.prettyhexrep(destination_hash)+", retry limit reached", RNS.LOG_EXTREME)
completed_announces.append(destination_hash)
else:
if time.time() > announce_entry[IDX_AT_RTRNS_TMO]:
announce_entry[IDX_AT_RTRNS_TMO] = time.time() + Transport.PATHFINDER_G + Transport.PATHFINDER_RW
announce_entry[IDX_AT_RETRIES] += 1
packet = announce_entry[IDX_AT_PACKET]
block_rebroadcasts = announce_entry[IDX_AT_BLCK_RBRD]
attached_interface = announce_entry[IDX_AT_ATTCHD_IF]
announce_context = RNS.Packet.NONE
if block_rebroadcasts: announce_context = RNS.Packet.PATH_RESPONSE
announce_data = packet.data
announce_identity = RNS.Identity.recall(packet.destination_hash)
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
announce_destination.hash = packet.destination_hash
announce_destination.hexhash = announce_destination.hash.hex()
new_packet = RNS.Packet(
announce_destination,
announce_data,
RNS.Packet.ANNOUNCE,
context = announce_context,
header_type = RNS.Packet.HEADER_2,
transport_type = Transport.TRANSPORT,
transport_id = Transport.identity.hash,
attached_interface = attached_interface,
context_flag = packet.context_flag,
)
new_packet.hops = announce_entry[4]
if block_rebroadcasts:
RNS.log("Rebroadcasting announce as path response for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG)
else:
RNS.log("Rebroadcasting announce for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG)
outgoing.append(new_packet)
new_packet.hops = announce_entry[4]
if block_rebroadcasts:
RNS.log("Rebroadcasting announce as path response for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG)
else:
RNS.log("Rebroadcasting announce for "+RNS.prettyhexrep(announce_destination.hash)+" with hop count "+str(new_packet.hops), RNS.LOG_DEBUG)
outgoing.append(new_packet)
# This handles an edge case where a peer sends a past
# request for a destination just after an announce for
# said destination has arrived, but before it has been
# rebroadcast locally. In such a case the actual announce
# is temporarily held, and then reinserted when the path
# request has been served to the peer.
if destination_hash in Transport.held_announces:
held_entry = Transport.held_announces.pop(destination_hash)
Transport.announce_table[destination_hash] = held_entry
RNS.log("Reinserting held announce into table", RNS.LOG_DEBUG)
# This handles an edge case where a peer sends a past
# request for a destination just after an announce for
# said destination has arrived, but before it has been
# rebroadcast locally. In such a case the actual announce
# is temporarily held, and then reinserted when the path
# request has been served to the peer.
if destination_hash in Transport.held_announces:
held_entry = Transport.held_announces.pop(destination_hash)
Transport.announce_table[destination_hash] = held_entry
RNS.log("Reinserting held announce into table", RNS.LOG_DEBUG)
for destination_hash in completed_announces:
if destination_hash in Transport.announce_table:
Transport.announce_table.pop(destination_hash)
for destination_hash in completed_announces:
if destination_hash in Transport.announce_table: Transport.announce_table.pop(destination_hash)
Transport.announces_last_checked = time.time()
@ -593,233 +609,244 @@ class Transport:
# Cull invalidated path requests
if time.time() > Transport.pending_prs_last_checked+Transport.pending_prs_check_interval:
for destination_hash in Transport.pending_local_path_requests.copy():
if not Transport.pending_local_path_requests[destination_hash] in Transport.interfaces:
Transport.pending_local_path_requests.pop(destination_hash)
with Transport.pending_local_prs_lock:
for destination_hash in Transport.pending_local_path_requests:
if not Transport.pending_local_path_requests[destination_hash] in Transport.interfaces:
Transport.pending_local_path_requests.pop(destination_hash)
Transport.pending_prs_last_checked = time.time()
# Cull the path request tags list if it has reached its max size
if len(Transport.discovery_pr_tags) > Transport.max_pr_tags:
Transport.discovery_pr_tags = Transport.discovery_pr_tags[len(Transport.discovery_pr_tags)-Transport.max_pr_tags:len(Transport.discovery_pr_tags)-1]
with Transport.discovery_pr_tags_lock:
Transport.discovery_pr_tags = Transport.discovery_pr_tags[len(Transport.discovery_pr_tags)-Transport.max_pr_tags:len(Transport.discovery_pr_tags)-1]
if time.time() > Transport.tables_last_culled + Transport.tables_cull_interval:
# Remove unneeded path state entries
stale_path_states = []
for destination_hash in Transport.path_states:
if not destination_hash in Transport.path_table:
stale_path_states.append(destination_hash)
with Transport.path_states_lock:
for destination_hash in Transport.path_states:
if not destination_hash in Transport.path_table:
stale_path_states.append(destination_hash)
# Cull the reverse table according to timeout
stale_reverse_entries = []
for truncated_packet_hash in Transport.reverse_table:
reverse_entry = Transport.reverse_table[truncated_packet_hash]
if time.time() > reverse_entry[IDX_RT_TIMESTAMP] + Transport.REVERSE_TIMEOUT:
stale_reverse_entries.append(truncated_packet_hash)
elif not reverse_entry[IDX_RT_OUTB_IF] in Transport.interfaces:
stale_reverse_entries.append(truncated_packet_hash)
elif not reverse_entry[IDX_RT_RCVD_IF] in Transport.interfaces:
stale_reverse_entries.append(truncated_packet_hash)
with Transport.reverse_table_lock:
for truncated_packet_hash in Transport.reverse_table:
reverse_entry = Transport.reverse_table[truncated_packet_hash]
if time.time() > reverse_entry[IDX_RT_TIMESTAMP] + Transport.REVERSE_TIMEOUT: stale_reverse_entries.append(truncated_packet_hash)
elif not reverse_entry[IDX_RT_OUTB_IF] in Transport.interfaces: stale_reverse_entries.append(truncated_packet_hash)
elif not reverse_entry[IDX_RT_RCVD_IF] in Transport.interfaces: stale_reverse_entries.append(truncated_packet_hash)
# Cull the link table according to timeout
stale_links = []
for link_id in Transport.link_table:
link_entry = Transport.link_table[link_id]
with Transport.link_table_lock:
for link_id in Transport.link_table:
link_entry = Transport.link_table[link_id]
if link_entry[IDX_LT_VALIDATED] == True:
if time.time() > link_entry[IDX_LT_TIMESTAMP] + Transport.LINK_TIMEOUT:
stale_links.append(link_id)
elif not link_entry[IDX_LT_NH_IF] in Transport.interfaces:
stale_links.append(link_id)
elif not link_entry[IDX_LT_RCVD_IF] in Transport.interfaces:
stale_links.append(link_id)
else:
if time.time() > link_entry[IDX_LT_PROOF_TMO]:
stale_links.append(link_id)
if link_entry[IDX_LT_VALIDATED] == True:
if time.time() > link_entry[IDX_LT_TIMESTAMP] + Transport.LINK_TIMEOUT: stale_links.append(link_id)
elif not link_entry[IDX_LT_NH_IF] in Transport.interfaces: stale_links.append(link_id)
elif not link_entry[IDX_LT_RCVD_IF] in Transport.interfaces: stale_links.append(link_id)
else:
if time.time() > link_entry[IDX_LT_PROOF_TMO]:
stale_links.append(link_id)
last_path_request = 0
if link_entry[IDX_LT_DSTHASH] in Transport.path_requests:
last_path_request = Transport.path_requests[link_entry[IDX_LT_DSTHASH]]
last_path_request = 0
with Transport.path_requests_lock:
if link_entry[IDX_LT_DSTHASH] in Transport.path_requests:
last_path_request = Transport.path_requests[link_entry[IDX_LT_DSTHASH]]
lr_taken_hops = link_entry[IDX_LT_HOPS]
lr_taken_hops = link_entry[IDX_LT_HOPS]
path_request_throttle = time.time() - last_path_request < Transport.PATH_REQUEST_MI
path_request_conditions = False
# If the path has been invalidated between the time of
# making the link request and now, try to rediscover it
if not Transport.has_path(link_entry[IDX_LT_DSTHASH]):
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and path is now missing", RNS.LOG_DEBUG)
path_request_conditions =True
path_request_throttle = time.time() - last_path_request < Transport.PATH_REQUEST_MI
path_request_conditions = False
# If the path has been invalidated between the time of
# making the link request and now, try to rediscover it
if not Transport.has_path(link_entry[IDX_LT_DSTHASH]):
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and path is now missing", RNS.LOG_DEBUG)
path_request_conditions = True
# If this link request was originated from a local client
# attempt to rediscover a path to the destination, if this
# has not already happened recently.
elif not path_request_throttle and lr_taken_hops == 0:
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted local client link was never established", RNS.LOG_DEBUG)
path_request_conditions = True
# If this link request was originated from a local client
# attempt to rediscover a path to the destination, if this
# has not already happened recently.
elif not path_request_throttle and lr_taken_hops == 0:
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted local client link was never established", RNS.LOG_DEBUG)
path_request_conditions = True
# If the link destination was previously only 1 hop
# away, this likely means that it was local to one
# of our interfaces, and that it roamed somewhere else.
# In that case, try to discover a new path, and mark
# the old one as unresponsive.
elif not path_request_throttle and Transport.hops_to(link_entry[IDX_LT_DSTHASH]) == 1:
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and destination was previously local to an interface on this instance", RNS.LOG_DEBUG)
path_request_conditions = True
blocked_if = link_entry[IDX_LT_RCVD_IF]
# If the link destination was previously only 1 hop
# away, this likely means that it was local to one
# of our interfaces, and that it roamed somewhere else.
# In that case, try to discover a new path, and mark
# the old one as unresponsive.
elif not path_request_throttle and Transport.hops_to(link_entry[IDX_LT_DSTHASH]) == 1:
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and destination was previously local to an interface on this instance", RNS.LOG_DEBUG)
path_request_conditions = True
blocked_if = link_entry[IDX_LT_RCVD_IF]
# TODO: This might result in the path re-resolution
# only being able to happen once, since new path found
# after allowing update from higher hop-count path, after
# marking old path unresponsive, might be more than 1 hop away,
# thus dealocking us into waiting for a new announce all-together.
# Is this problematic, or does it actually not matter?
# Best would be to have full support for alternative paths,
# and score them according to number of unsuccessful tries or
# similar.
if RNS.Reticulum.transport_enabled():
if hasattr(link_entry[IDX_LT_RCVD_IF], "mode") and link_entry[IDX_LT_RCVD_IF].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
Transport.mark_path_unresponsive(link_entry[IDX_LT_DSTHASH])
# TODO: This might result in the path re-resolution
# only being able to happen once, since new path found
# after allowing update from higher hop-count path, after
# marking old path unresponsive, might be more than 1 hop away,
# thus dealocking us into waiting for a new announce all-together.
# Is this problematic, or does it actually not matter?
# Best would be to have full support for alternative paths,
# and score them according to number of unsuccessful tries or
# similar.
if RNS.Reticulum.transport_enabled():
if hasattr(link_entry[IDX_LT_RCVD_IF], "mode") and link_entry[IDX_LT_RCVD_IF].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
Transport.mark_path_unresponsive(link_entry[IDX_LT_DSTHASH])
# If the link initiator is only 1 hop away,
# this likely means that network topology has
# changed. In that case, we try to discover a new path,
# and mark the old one as potentially unresponsive.
elif not path_request_throttle and lr_taken_hops == 1:
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and link initiator is local to an interface on this instance", RNS.LOG_DEBUG)
path_request_conditions = True
blocked_if = link_entry[IDX_LT_RCVD_IF]
# If the link initiator is only 1 hop away,
# this likely means that network topology has
# changed. In that case, we try to discover a new path,
# and mark the old one as potentially unresponsive.
elif not path_request_throttle and lr_taken_hops == 1:
RNS.log("Trying to rediscover path for "+RNS.prettyhexrep(link_entry[IDX_LT_DSTHASH])+" since an attempted link was never established, and link initiator is local to an interface on this instance", RNS.LOG_DEBUG)
path_request_conditions = True
blocked_if = link_entry[IDX_LT_RCVD_IF]
if RNS.Reticulum.transport_enabled():
if hasattr(link_entry[IDX_LT_RCVD_IF], "mode") and link_entry[IDX_LT_RCVD_IF].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
Transport.mark_path_unresponsive(link_entry[IDX_LT_DSTHASH])
if RNS.Reticulum.transport_enabled():
if hasattr(link_entry[IDX_LT_RCVD_IF], "mode") and link_entry[IDX_LT_RCVD_IF].mode != RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
Transport.mark_path_unresponsive(link_entry[IDX_LT_DSTHASH])
if path_request_conditions:
if not link_entry[IDX_LT_DSTHASH] in path_requests:
path_requests[link_entry[IDX_LT_DSTHASH]] = blocked_if
if path_request_conditions:
with Transport.path_requests_lock:
if not link_entry[IDX_LT_DSTHASH] in path_requests:
path_requests[link_entry[IDX_LT_DSTHASH]] = blocked_if
if not RNS.Reticulum.transport_enabled():
# Drop current path if we are not a transport instance, to
# allow using higher-hop count paths or reused announces
# from newly adjacent transport instances.
Transport.expire_path(link_entry[IDX_LT_DSTHASH])
if not RNS.Reticulum.transport_enabled():
# Drop current path if we are not a transport instance, to
# allow using higher-hop count paths or reused announces
# from newly adjacent transport instances.
Transport.expire_path(link_entry[IDX_LT_DSTHASH])
# Cull the path table
stale_paths = []
for destination_hash in Transport.path_table:
destination_entry = Transport.path_table[destination_hash]
attached_interface = destination_entry[IDX_PT_RVCD_IF]
with Transport.path_table_lock:
for destination_hash in Transport.path_table:
destination_entry = Transport.path_table[destination_hash]
attached_interface = destination_entry[IDX_PT_RVCD_IF]
if attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.AP_PATH_TIME
elif attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.ROAMING_PATH_TIME
else:
destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.DESTINATION_TIMEOUT
if attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.AP_PATH_TIME
elif attached_interface != None and hasattr(attached_interface, "mode") and attached_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.ROAMING_PATH_TIME
else:
destination_expiry = destination_entry[IDX_PT_TIMESTAMP] + Transport.DESTINATION_TIMEOUT
if time.time() > destination_expiry:
stale_paths.append(destination_hash)
should_collect = True
RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG)
elif not attached_interface in Transport.interfaces:
stale_paths.append(destination_hash)
should_collect = True
RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" was removed since the attached interface no longer exists", RNS.LOG_DEBUG)
if time.time() > destination_expiry:
stale_paths.append(destination_hash)
should_collect = True
RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG)
elif not attached_interface in Transport.interfaces:
stale_paths.append(destination_hash)
should_collect = True
RNS.log("Path to "+RNS.prettyhexrep(destination_hash)+" was removed since the attached interface no longer exists", RNS.LOG_DEBUG)
# Cull the pending discovery path requests table
stale_discovery_path_requests = []
for destination_hash in Transport.discovery_path_requests:
entry = Transport.discovery_path_requests[destination_hash]
with Transport.discovery_pr_lock:
for destination_hash in Transport.discovery_path_requests:
entry = Transport.discovery_path_requests[destination_hash]
if time.time() > entry["timeout"]:
stale_discovery_path_requests.append(destination_hash)
should_collect = True
RNS.log("Waiting path request for "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG)
if time.time() > entry["timeout"]:
stale_discovery_path_requests.append(destination_hash)
should_collect = True
RNS.log("Waiting path request for "+RNS.prettyhexrep(destination_hash)+" timed out and was removed", RNS.LOG_DEBUG)
# Cull the tunnel table
stale_tunnels = []; ti = 0
for tunnel_id in Transport.tunnels:
tunnel_entry = Transport.tunnels[tunnel_id]
with Transport.tunnels_lock:
for tunnel_id in Transport.tunnels:
tunnel_entry = Transport.tunnels[tunnel_id]
expires = tunnel_entry[IDX_TT_EXPIRES]
if time.time() > expires:
stale_tunnels.append(tunnel_id)
should_collect = True
RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME)
else:
if tunnel_entry[IDX_TT_IF] and not tunnel_entry[IDX_TT_IF] in Transport.interfaces:
RNS.log(f"Removing non-existent tunnel interface {tunnel_entry[IDX_TT_IF]}", RNS.LOG_EXTREME)
tunnel_entry[IDX_TT_IF] = None
expires = tunnel_entry[IDX_TT_EXPIRES]
if time.time() > expires:
stale_tunnels.append(tunnel_id)
should_collect = True
RNS.log("Tunnel "+RNS.prettyhexrep(tunnel_id)+" timed out and was removed", RNS.LOG_EXTREME)
else:
if tunnel_entry[IDX_TT_IF] and not tunnel_entry[IDX_TT_IF] in Transport.interfaces:
RNS.log(f"Removing non-existent tunnel interface {tunnel_entry[IDX_TT_IF]}", RNS.LOG_EXTREME)
tunnel_entry[IDX_TT_IF] = None
stale_tunnel_paths = []
tunnel_paths = tunnel_entry[IDX_TT_PATHS]
for tunnel_path in tunnel_paths:
tunnel_path_entry = tunnel_paths[tunnel_path]
stale_tunnel_paths = []
tunnel_paths = tunnel_entry[IDX_TT_PATHS]
for tunnel_path in tunnel_paths:
tunnel_path_entry = tunnel_paths[tunnel_path]
if time.time() > tunnel_path_entry[0] + Transport.DESTINATION_TIMEOUT:
stale_tunnel_paths.append(tunnel_path)
should_collect = True
RNS.log("Tunnel path to "+RNS.prettyhexrep(tunnel_path)+" timed out and was removed", RNS.LOG_EXTREME)
for tunnel_path in stale_tunnel_paths:
tunnel_paths.pop(tunnel_path)
ti += 1
if time.time() > tunnel_path_entry[0] + Transport.DESTINATION_TIMEOUT:
stale_tunnel_paths.append(tunnel_path)
should_collect = True
RNS.log("Tunnel path to "+RNS.prettyhexrep(tunnel_path)+" timed out and was removed", RNS.LOG_EXTREME)
for tunnel_path in stale_tunnel_paths:
tunnel_paths.pop(tunnel_path)
ti += 1
if ti > 0:
if ti == 1: RNS.log("Removed "+str(ti)+" tunnel path", RNS.LOG_EXTREME)
else: RNS.log("Removed "+str(ti)+" tunnel paths", RNS.LOG_EXTREME)
i = 0
for truncated_packet_hash in stale_reverse_entries:
Transport.reverse_table.pop(truncated_packet_hash)
i += 1
with Transport.reverse_table_lock:
for truncated_packet_hash in stale_reverse_entries:
if truncated_packet_hash in Transport.reverse_table: Transport.reverse_table.pop(truncated_packet_hash)
i += 1
if i > 0:
if i == 1: RNS.log("Released "+str(i)+" reverse table entry", RNS.LOG_EXTREME)
else: RNS.log("Released "+str(i)+" reverse table entries", RNS.LOG_EXTREME)
i = 0
for link_id in stale_links:
Transport.link_table.pop(link_id)
i += 1
with Transport.link_table_lock:
for link_id in stale_links:
Transport.link_table.pop(link_id)
i += 1
if i > 0:
if i == 1: RNS.log("Released "+str(i)+" link", RNS.LOG_EXTREME)
else: RNS.log("Released "+str(i)+" links", RNS.LOG_EXTREME)
i = 0
for destination_hash in stale_paths:
Transport.path_table.pop(destination_hash)
i += 1
with Transport.path_table_lock:
for destination_hash in stale_paths:
if destination_hash in Transport.path_table: Transport.path_table.pop(destination_hash)
i += 1
if i > 0:
if i == 1: RNS.log("Removed "+str(i)+" path", RNS.LOG_EXTREME)
else: RNS.log("Removed "+str(i)+" paths", RNS.LOG_EXTREME)
i = 0
for destination_hash in stale_discovery_path_requests:
Transport.discovery_path_requests.pop(destination_hash)
i += 1
with Transport.discovery_pr_lock:
for destination_hash in stale_discovery_path_requests:
Transport.discovery_path_requests.pop(destination_hash)
i += 1
if i > 0:
if i == 1: RNS.log("Removed "+str(i)+" waiting path request", RNS.LOG_EXTREME)
else: RNS.log("Removed "+str(i)+" waiting path requests", RNS.LOG_EXTREME)
i = 0
for tunnel_id in stale_tunnels:
Transport.tunnels.pop(tunnel_id)
i += 1
with Transport.tunnels_lock:
for tunnel_id in stale_tunnels:
Transport.tunnels.pop(tunnel_id)
i += 1
if i > 0:
if i == 1: RNS.log("Removed "+str(i)+" tunnel", RNS.LOG_EXTREME)
else: RNS.log("Removed "+str(i)+" tunnels", RNS.LOG_EXTREME)
i = 0
for destination_hash in stale_path_states:
Transport.path_states.pop(destination_hash)
i += 1
with Transport.path_states_lock:
for destination_hash in stale_path_states:
Transport.path_states.pop(destination_hash)
i += 1
if i > 0:
if i == 1: RNS.log("Removed "+str(i)+" path state entry", RNS.LOG_EXTREME)
@ -830,9 +857,13 @@ class Transport:
# Run interface-related jobs
if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval:
Transport.prioritize_interfaces()
for interface in Transport.interfaces:
interface.process_held_announces()
Transport.interface_last_jobs = time.time()
try:
for interface in Transport.interfaces:
interface.process_held_announces()
Transport.interface_last_jobs = time.time()
except Exception as e:
RNS.log(f"Error while processing held per-interface announces: {e}", RNS.LOG_WARNING)
RNS.log(f"Postponing until next job run", RNS.LOG_WARNING)
# Clean packet caches
if time.time() > Transport.cache_last_cleaned+Transport.cache_clean_interval:
@ -877,10 +908,6 @@ class Transport:
if should_collect: gc.collect()
else:
# Transport jobs were locked, do nothing
pass
except Exception as e:
RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -1754,22 +1781,23 @@ class Transport:
# If this is a path response from a local client,
# check if any external interfaces have pending
# path requests.
if packet.destination_hash in Transport.pending_local_path_requests:
desiring_interface = Transport.pending_local_path_requests.pop(packet.destination_hash)
retransmit_timeout = now
retries = Transport.PATHFINDER_R
with Transport.pending_local_prs_lock:
if packet.destination_hash in Transport.pending_local_path_requests:
desiring_interface = Transport.pending_local_path_requests.pop(packet.destination_hash)
retransmit_timeout = now
retries = Transport.PATHFINDER_R
Transport.announce_table[packet.destination_hash] = [
now,
retransmit_timeout,
retries,
received_from,
announce_hops,
packet,
local_rebroadcasts,
block_rebroadcasts,
attached_interface
]
Transport.announce_table[packet.destination_hash] = [
now,
retransmit_timeout,
retries,
received_from,
announce_hops,
packet,
local_rebroadcasts,
block_rebroadcasts,
attached_interface
]
# If we have any local clients connected, we re-
# transmit the announce to them immediately
@ -2467,44 +2495,45 @@ class Transport:
@staticmethod
def expire_path(destination_hash):
if destination_hash in Transport.path_table:
Transport.path_table[destination_hash][IDX_PT_TIMESTAMP] = 0
Transport.tables_last_culled = 0
return True
else:
return False
with Transport.path_table_lock:
if destination_hash in Transport.path_table:
Transport.path_table[destination_hash][IDX_PT_TIMESTAMP] = 0
Transport.tables_last_culled = 0
return True
else: return False
@staticmethod
def mark_path_unresponsive(destination_hash):
if destination_hash in Transport.path_table:
Transport.path_states[destination_hash] = Transport.STATE_UNRESPONSIVE
with Transport.path_states_lock: Transport.path_states[destination_hash] = Transport.STATE_UNRESPONSIVE
return True
else:
return False
else: return False
@staticmethod
def mark_path_responsive(destination_hash):
if destination_hash in Transport.path_table:
Transport.path_states[destination_hash] = Transport.STATE_RESPONSIVE
with Transport.path_states_lock: Transport.path_states[destination_hash] = Transport.STATE_RESPONSIVE
return True
else:
return False
else: return False
@staticmethod
def mark_path_unknown_state(destination_hash):
if destination_hash in Transport.path_table:
Transport.path_states[destination_hash] = Transport.STATE_UNKNOWN
with Transport.path_states_lock: Transport.path_states[destination_hash] = Transport.STATE_UNKNOWN
return True
else:
return False
else: return False
@staticmethod
def path_is_unresponsive(destination_hash):
if destination_hash in Transport.path_states:
if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE:
return True
with Transport.path_states_lock:
if destination_hash in Transport.path_states:
if Transport.path_states[destination_hash] == Transport.STATE_UNRESPONSIVE: return True
return False
return False
@staticmethod
def await_path(destination_hash, timeout=None, on_interface=None):
@ -2533,28 +2562,19 @@ class Transport:
:param destination_hash: A destination hash as *bytes*.
:param on_interface: If specified, the path request will only be sent on this interface. In normal use, Reticulum handles this automatically, and this parameter should not be used.
"""
if tag == None:
request_tag = RNS.Identity.get_random_hash()
else:
request_tag = tag
if tag == None: request_tag = RNS.Identity.get_random_hash()
else: request_tag = tag
if RNS.Reticulum.transport_enabled():
path_request_data = destination_hash+Transport.identity.hash+request_tag
else:
path_request_data = destination_hash+request_tag
if RNS.Reticulum.transport_enabled(): path_request_data = destination_hash+Transport.identity.hash+request_tag
else: path_request_data = destination_hash+request_tag
path_request_dst = RNS.Destination(None, RNS.Destination.OUT, RNS.Destination.PLAIN, Transport.APP_NAME, "path", "request")
packet = RNS.Packet(path_request_dst, path_request_data, packet_type = RNS.Packet.DATA, transport_type = RNS.Transport.BROADCAST, header_type = RNS.Packet.HEADER_1, attached_interface = on_interface)
if on_interface != None and recursive:
if not hasattr(on_interface, "announce_cap"):
on_interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP
if not hasattr(on_interface, "announce_allowed_at"):
on_interface.announce_allowed_at = 0
if not hasattr(on_interface, "announce_queue"):
on_interface.announce_queue = []
if not hasattr(on_interface, "announce_cap"): on_interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP
if not hasattr(on_interface, "announce_allowed_at"): on_interface.announce_allowed_at = 0
if not hasattr(on_interface, "announce_queue"): on_interface.announce_queue = []
queued_announces = True if len(on_interface.announce_queue) > 0 else False
if queued_announces:
@ -2581,8 +2601,7 @@ class Transport:
if isinstance(data, list) and len(data) > 0:
response = []
response.append(Transport.owner.get_interface_stats())
if data[0] == True:
response.append(Transport.owner.get_link_count())
if data[0] == True: response.append(Transport.owner.get_link_count())
return response
@ -2601,10 +2620,8 @@ class Transport:
command = data[0]
destination_hash = None
max_hops = None
if len(data) > 1:
destination_hash = data[1]
if len(data) > 2:
max_hops = data[2]
if len(data) > 1: destination_hash = data[1]
if len(data) > 2: max_hops = data[2]
if command == "table":
table = Transport.owner.get_path_table(max_hops=max_hops)
@ -2658,25 +2675,21 @@ class Transport:
unique_tag = destination_hash+tag_bytes
if not unique_tag in Transport.discovery_pr_tags:
Transport.discovery_pr_tags.append(unique_tag)
with Transport.discovery_pr_tags_lock:
if not unique_tag in Transport.discovery_pr_tags:
Transport.discovery_pr_tags.append(unique_tag)
Transport.path_request(
destination_hash,
Transport.from_local_client(packet),
packet.receiving_interface,
requestor_transport_id = requesting_transport_instance,
tag=tag_bytes
)
Transport.path_request(destination_hash,
Transport.from_local_client(packet),
packet.receiving_interface,
requestor_transport_id = requesting_transport_instance,
tag=tag_bytes)
else:
RNS.log("Ignoring duplicate path request for "+RNS.prettyhexrep(destination_hash)+" with tag "+RNS.prettyhexrep(unique_tag), RNS.LOG_DEBUG)
else: RNS.log("Ignoring duplicate path request for "+RNS.prettyhexrep(destination_hash)+" with tag "+RNS.prettyhexrep(unique_tag), RNS.LOG_DEBUG)
else:
RNS.log("Ignoring tagless path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
else: RNS.log("Ignoring tagless path request for "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Error while handling path request. The contained exception was: "+str(e), RNS.LOG_ERROR)
except Exception as e: RNS.log("Error while handling path request. The contained exception was: "+str(e), RNS.LOG_ERROR)
@staticmethod
def path_request(destination_hash, is_from_local_client, attached_interface, requestor_transport_id=None, tag=None):
@ -2685,10 +2698,9 @@ class Transport:
if attached_interface != None:
if RNS.Reticulum.transport_enabled() and attached_interface.mode in RNS.Interfaces.Interface.Interface.DISCOVER_PATHS_FOR:
should_search_for_unknown = True
interface_str = " on "+str(attached_interface)
else:
interface_str = ""
else: interface_str = ""
RNS.log("Path request for "+RNS.prettyhexrep(destination_hash)+interface_str, RNS.LOG_DEBUG)
@ -2699,7 +2711,8 @@ class Transport:
if Transport.is_local_client_interface(destination_interface):
destination_exists_on_local_client = True
Transport.pending_local_path_requests[destination_hash] = attached_interface
with Transport.pending_local_prs_lock:
Transport.pending_local_path_requests[destination_hash] = attached_interface
local_destination = next((d for d in Transport.destinations if d.hash == destination_hash), None)
if local_destination != None: