mirror of
https://github.com/markqvist/Reticulum.git
synced 2026-04-27 22:25:37 +00:00
Force synchronous processing for entire announce logic flow
This commit is contained in:
parent
c9b6dc007a
commit
326d719a49
1 changed files with 298 additions and 289 deletions
581
RNS/Transport.py
581
RNS/Transport.py
|
|
@ -37,6 +37,7 @@ import struct
|
|||
import inspect
|
||||
import threading
|
||||
from time import sleep
|
||||
from threading import Lock
|
||||
from .vendor import umsgpack as umsgpack
|
||||
from RNS.Interfaces.BackboneInterface import BackboneInterface
|
||||
|
||||
|
|
@ -154,6 +155,7 @@ class Transport:
|
|||
tables_cull_interval = 5.0
|
||||
interface_last_jobs = 0.0
|
||||
interface_jobs_interval = 5.0
|
||||
inbound_announce_lock = Lock()
|
||||
|
||||
traffic_rxb = 0
|
||||
traffic_txb = 0
|
||||
|
|
@ -1524,325 +1526,332 @@ class Transport:
|
|||
|
||||
random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10]
|
||||
random_blobs = []
|
||||
if packet.destination_hash in Transport.path_table:
|
||||
random_blobs = Transport.path_table[packet.destination_hash][IDX_PT_RANDBLOBS]
|
||||
with Transport.inbound_announce_lock:
|
||||
if packet.destination_hash in Transport.path_table:
|
||||
random_blobs = Transport.path_table[packet.destination_hash][IDX_PT_RANDBLOBS]
|
||||
|
||||
# If we already have a path to the announced
|
||||
# destination, but the hop count is equal or
|
||||
# less, we'll update our tables.
|
||||
if packet.hops <= Transport.path_table[packet.destination_hash][IDX_PT_HOPS]:
|
||||
# Make sure we haven't heard the random
|
||||
# blob before, so announces can't be
|
||||
# replayed to forge paths.
|
||||
# TODO: Check whether this approach works
|
||||
# under all circumstances
|
||||
path_timebase = Transport.timebase_from_random_blobs(random_blobs)
|
||||
if not random_blob in random_blobs and announce_emitted > path_timebase:
|
||||
Transport.mark_path_unknown_state(packet.destination_hash)
|
||||
should_add = True
|
||||
else:
|
||||
should_add = False
|
||||
else:
|
||||
# If an announce arrives with a larger hop
|
||||
# count than we already have in the table,
|
||||
# ignore it, unless the path is expired, or
|
||||
# the emission timestamp is more recent.
|
||||
now = time.time()
|
||||
path_expires = Transport.path_table[packet.destination_hash][IDX_PT_EXPIRES]
|
||||
|
||||
path_announce_emitted = 0
|
||||
for path_random_blob in random_blobs:
|
||||
path_announce_emitted = max(path_announce_emitted, int.from_bytes(path_random_blob[5:10], "big"))
|
||||
if path_announce_emitted >= announce_emitted:
|
||||
break
|
||||
|
||||
# If the path has expired, consider this
|
||||
# announce for adding to the path table.
|
||||
if (now >= path_expires):
|
||||
# We check that the announce is
|
||||
# different from ones we've already heard,
|
||||
# to avoid loops in the network
|
||||
if not random_blob in random_blobs:
|
||||
# TODO: Check that this ^ approach actually
|
||||
# works under all circumstances
|
||||
RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce due to expired path", RNS.LOG_DEBUG)
|
||||
# If we already have a path to the announced
|
||||
# destination, but the hop count is equal or
|
||||
# less, we'll update our tables.
|
||||
if packet.hops <= Transport.path_table[packet.destination_hash][IDX_PT_HOPS]:
|
||||
# Make sure we haven't heard the random
|
||||
# blob before, so announces can't be
|
||||
# replayed to forge paths.
|
||||
# TODO: Check whether this approach works
|
||||
# under all circumstances
|
||||
path_timebase = Transport.timebase_from_random_blobs(random_blobs)
|
||||
if not random_blob in random_blobs and announce_emitted > path_timebase:
|
||||
Transport.mark_path_unknown_state(packet.destination_hash)
|
||||
should_add = True
|
||||
else:
|
||||
should_add = False
|
||||
else:
|
||||
# If the path is not expired, but the emission
|
||||
# is more recent, and we haven't already heard
|
||||
# this announce before, update the path table.
|
||||
if (announce_emitted > path_announce_emitted):
|
||||
# If an announce arrives with a larger hop
|
||||
# count than we already have in the table,
|
||||
# ignore it, unless the path is expired, or
|
||||
# the emission timestamp is more recent.
|
||||
now = time.time()
|
||||
path_expires = Transport.path_table[packet.destination_hash][IDX_PT_EXPIRES]
|
||||
|
||||
path_announce_emitted = 0
|
||||
for path_random_blob in random_blobs:
|
||||
path_announce_emitted = max(path_announce_emitted, int.from_bytes(path_random_blob[5:10], "big"))
|
||||
if path_announce_emitted >= announce_emitted:
|
||||
break
|
||||
|
||||
# If the path has expired, consider this
|
||||
# announce for adding to the path table.
|
||||
if (now >= path_expires):
|
||||
# We check that the announce is
|
||||
# different from ones we've already heard,
|
||||
# to avoid loops in the network
|
||||
if not random_blob in random_blobs:
|
||||
RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since it was more recently emitted", RNS.LOG_DEBUG)
|
||||
# TODO: Check that this ^ approach actually
|
||||
# works under all circumstances
|
||||
RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce due to expired path", RNS.LOG_DEBUG)
|
||||
Transport.mark_path_unknown_state(packet.destination_hash)
|
||||
should_add = True
|
||||
else:
|
||||
should_add = False
|
||||
|
||||
# If we have already heard this announce before,
|
||||
# but the path has been marked as unresponsive
|
||||
# by a failed communications attempt or similar,
|
||||
# allow updating the path table to this one.
|
||||
elif announce_emitted == path_announce_emitted:
|
||||
if Transport.path_is_unresponsive(packet.destination_hash):
|
||||
RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since previously tried path was unresponsive", RNS.LOG_DEBUG)
|
||||
should_add = True
|
||||
else:
|
||||
should_add = False
|
||||
else:
|
||||
# If the path is not expired, but the emission
|
||||
# is more recent, and we haven't already heard
|
||||
# this announce before, update the path table.
|
||||
if (announce_emitted > path_announce_emitted):
|
||||
if not random_blob in random_blobs:
|
||||
RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since it was more recently emitted", RNS.LOG_DEBUG)
|
||||
Transport.mark_path_unknown_state(packet.destination_hash)
|
||||
should_add = True
|
||||
else:
|
||||
should_add = False
|
||||
|
||||
# If we have already heard this announce before,
|
||||
# but the path has been marked as unresponsive
|
||||
# by a failed communications attempt or similar,
|
||||
# allow updating the path table to this one.
|
||||
elif announce_emitted == path_announce_emitted:
|
||||
if Transport.path_is_unresponsive(packet.destination_hash):
|
||||
RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since previously tried path was unresponsive", RNS.LOG_DEBUG)
|
||||
should_add = True
|
||||
else:
|
||||
should_add = False
|
||||
|
||||
else:
|
||||
# If this destination is unknown in our table
|
||||
# we should add it
|
||||
should_add = True
|
||||
else:
|
||||
# If this destination is unknown in our table
|
||||
# we should add it
|
||||
should_add = True
|
||||
|
||||
if should_add:
|
||||
now = time.time()
|
||||
if should_add:
|
||||
now = time.time()
|
||||
|
||||
rate_blocked = False
|
||||
if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None:
|
||||
if not packet.destination_hash in Transport.announce_rate_table:
|
||||
rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]}
|
||||
Transport.announce_rate_table[packet.destination_hash] = rate_entry
|
||||
|
||||
else:
|
||||
rate_entry = Transport.announce_rate_table[packet.destination_hash]
|
||||
rate_entry["timestamps"].append(now)
|
||||
|
||||
while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS:
|
||||
rate_entry["timestamps"].pop(0)
|
||||
|
||||
current_rate = now - rate_entry["last"]
|
||||
|
||||
if now > rate_entry["blocked_until"]:
|
||||
if current_rate < packet.receiving_interface.announce_rate_target: rate_entry["rate_violations"] += 1
|
||||
else: rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1)
|
||||
|
||||
if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace:
|
||||
rate_target = packet.receiving_interface.announce_rate_target
|
||||
rate_penalty = packet.receiving_interface.announce_rate_penalty
|
||||
rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty
|
||||
rate_blocked = True
|
||||
else:
|
||||
rate_entry["last"] = now
|
||||
rate_blocked = False
|
||||
if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None:
|
||||
if not packet.destination_hash in Transport.announce_rate_table:
|
||||
rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]}
|
||||
Transport.announce_rate_table[packet.destination_hash] = rate_entry
|
||||
|
||||
else:
|
||||
rate_blocked = True
|
||||
|
||||
rate_entry = Transport.announce_rate_table[packet.destination_hash]
|
||||
rate_entry["timestamps"].append(now)
|
||||
|
||||
retries = 0
|
||||
announce_hops = packet.hops
|
||||
local_rebroadcasts = 0
|
||||
block_rebroadcasts = False
|
||||
attached_interface = None
|
||||
|
||||
retransmit_timeout = now + (RNS.rand() * Transport.PATHFINDER_RW)
|
||||
while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS:
|
||||
rate_entry["timestamps"].pop(0)
|
||||
|
||||
if hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
|
||||
expires = now + Transport.AP_PATH_TIME
|
||||
elif hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
|
||||
expires = now + Transport.ROAMING_PATH_TIME
|
||||
else:
|
||||
expires = now + Transport.PATHFINDER_E
|
||||
|
||||
if not random_blob in random_blobs:
|
||||
random_blobs.append(random_blob)
|
||||
random_blobs = random_blobs[-Transport.MAX_RANDOM_BLOBS:]
|
||||
current_rate = now - rate_entry["last"]
|
||||
|
||||
if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE:
|
||||
# Insert announce into announce table for retransmission
|
||||
if now > rate_entry["blocked_until"]:
|
||||
if current_rate < packet.receiving_interface.announce_rate_target: rate_entry["rate_violations"] += 1
|
||||
else: rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1)
|
||||
|
||||
if rate_blocked:
|
||||
RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG)
|
||||
if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace:
|
||||
rate_target = packet.receiving_interface.announce_rate_target
|
||||
rate_penalty = packet.receiving_interface.announce_rate_penalty
|
||||
rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty
|
||||
rate_blocked = True
|
||||
else:
|
||||
rate_entry["last"] = now
|
||||
|
||||
else:
|
||||
rate_blocked = True
|
||||
|
||||
|
||||
retries = 0
|
||||
announce_hops = packet.hops
|
||||
local_rebroadcasts = 0
|
||||
block_rebroadcasts = False
|
||||
attached_interface = None
|
||||
|
||||
retransmit_timeout = now + (RNS.rand() * Transport.PATHFINDER_RW)
|
||||
|
||||
if hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
|
||||
expires = now + Transport.AP_PATH_TIME
|
||||
elif hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
|
||||
expires = now + Transport.ROAMING_PATH_TIME
|
||||
else:
|
||||
if Transport.from_local_client(packet):
|
||||
# If the announce is from a local client,
|
||||
# it is announced immediately, but only one time.
|
||||
expires = now + Transport.PATHFINDER_E
|
||||
|
||||
if not random_blob in random_blobs:
|
||||
random_blobs.append(random_blob)
|
||||
random_blobs = random_blobs[-Transport.MAX_RANDOM_BLOBS:]
|
||||
|
||||
if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE:
|
||||
# Insert announce into announce table for retransmission
|
||||
|
||||
if rate_blocked:
|
||||
RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG)
|
||||
|
||||
else:
|
||||
if Transport.from_local_client(packet):
|
||||
# If the announce is from a local client,
|
||||
# it is announced immediately, but only one time.
|
||||
retransmit_timeout = now
|
||||
retries = Transport.PATHFINDER_R
|
||||
|
||||
Transport.announce_table[packet.destination_hash] = [
|
||||
now, # 0: IDX_AT_TIMESTAMP
|
||||
retransmit_timeout, # 1: IDX_AT_RTRNS_TMO
|
||||
retries, # 2: IDX_AT_RETRIES
|
||||
received_from, # 3: IDX_AT_RCVD_IF
|
||||
announce_hops, # 4: IDX_AT_HOPS
|
||||
packet, # 5: IDX_AT_PACKET
|
||||
local_rebroadcasts, # 6: IDX_AT_LCL_RBRD
|
||||
block_rebroadcasts, # 7: IDX_AT_BLCK_RBRD
|
||||
attached_interface, # 8: IDX_AT_ATTCHD_IF
|
||||
]
|
||||
|
||||
# TODO: Check from_local_client once and store result
|
||||
elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE:
|
||||
# If this is a path response from a local client,
|
||||
# check if any external interfaces have pending
|
||||
# path requests.
|
||||
if packet.destination_hash in Transport.pending_local_path_requests:
|
||||
desiring_interface = Transport.pending_local_path_requests.pop(packet.destination_hash)
|
||||
retransmit_timeout = now
|
||||
retries = Transport.PATHFINDER_R
|
||||
|
||||
Transport.announce_table[packet.destination_hash] = [
|
||||
now, # 0: IDX_AT_TIMESTAMP
|
||||
retransmit_timeout, # 1: IDX_AT_RTRNS_TMO
|
||||
retries, # 2: IDX_AT_RETRIES
|
||||
received_from, # 3: IDX_AT_RCVD_IF
|
||||
announce_hops, # 4: IDX_AT_HOPS
|
||||
packet, # 5: IDX_AT_PACKET
|
||||
local_rebroadcasts, # 6: IDX_AT_LCL_RBRD
|
||||
block_rebroadcasts, # 7: IDX_AT_BLCK_RBRD
|
||||
attached_interface, # 8: IDX_AT_ATTCHD_IF
|
||||
]
|
||||
Transport.announce_table[packet.destination_hash] = [
|
||||
now,
|
||||
retransmit_timeout,
|
||||
retries,
|
||||
received_from,
|
||||
announce_hops,
|
||||
packet,
|
||||
local_rebroadcasts,
|
||||
block_rebroadcasts,
|
||||
attached_interface
|
||||
]
|
||||
|
||||
# TODO: Check from_local_client once and store result
|
||||
elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE:
|
||||
# If this is a path response from a local client,
|
||||
# check if any external interfaces have pending
|
||||
# path requests.
|
||||
if packet.destination_hash in Transport.pending_local_path_requests:
|
||||
desiring_interface = Transport.pending_local_path_requests.pop(packet.destination_hash)
|
||||
retransmit_timeout = now
|
||||
retries = Transport.PATHFINDER_R
|
||||
|
||||
Transport.announce_table[packet.destination_hash] = [
|
||||
now,
|
||||
retransmit_timeout,
|
||||
retries,
|
||||
received_from,
|
||||
announce_hops,
|
||||
packet,
|
||||
local_rebroadcasts,
|
||||
block_rebroadcasts,
|
||||
attached_interface
|
||||
]
|
||||
|
||||
# If we have any local clients connected, we re-
|
||||
# transmit the announce to them immediately
|
||||
if (len(Transport.local_client_interfaces)):
|
||||
announce_identity = RNS.Identity.recall(packet.destination_hash)
|
||||
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
|
||||
announce_destination.hash = packet.destination_hash
|
||||
announce_destination.hexhash = announce_destination.hash.hex()
|
||||
announce_context = RNS.Packet.NONE
|
||||
announce_data = packet.data
|
||||
|
||||
# TODO: Shouldn't the context be PATH_RESPONSE in the first case here?
|
||||
if Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE:
|
||||
for local_interface in Transport.local_client_interfaces:
|
||||
if packet.receiving_interface != local_interface:
|
||||
new_announce = RNS.Packet(
|
||||
announce_destination,
|
||||
announce_data,
|
||||
RNS.Packet.ANNOUNCE,
|
||||
context = announce_context,
|
||||
header_type = RNS.Packet.HEADER_2,
|
||||
transport_type = Transport.TRANSPORT,
|
||||
transport_id = Transport.identity.hash,
|
||||
attached_interface = local_interface,
|
||||
context_flag = packet.context_flag,
|
||||
)
|
||||
|
||||
new_announce.hops = packet.hops
|
||||
new_announce.send()
|
||||
|
||||
else:
|
||||
for local_interface in Transport.local_client_interfaces:
|
||||
if packet.receiving_interface != local_interface:
|
||||
new_announce = RNS.Packet(
|
||||
announce_destination,
|
||||
announce_data,
|
||||
RNS.Packet.ANNOUNCE,
|
||||
context = announce_context,
|
||||
header_type = RNS.Packet.HEADER_2,
|
||||
transport_type = Transport.TRANSPORT,
|
||||
transport_id = Transport.identity.hash,
|
||||
attached_interface = local_interface,
|
||||
context_flag = packet.context_flag,
|
||||
)
|
||||
|
||||
new_announce.hops = packet.hops
|
||||
new_announce.send()
|
||||
|
||||
# If we have any waiting discovery path requests
|
||||
# for this destination, we retransmit to that
|
||||
# interface immediately
|
||||
if packet.destination_hash in Transport.discovery_path_requests:
|
||||
pr_entry = Transport.discovery_path_requests[packet.destination_hash]
|
||||
attached_interface = pr_entry["requesting_interface"]
|
||||
|
||||
interface_str = " on "+str(attached_interface)
|
||||
|
||||
RNS.log("Got matching announce, answering waiting discovery path request for "+RNS.prettyhexrep(packet.destination_hash)+interface_str, RNS.LOG_DEBUG)
|
||||
announce_identity = RNS.Identity.recall(packet.destination_hash)
|
||||
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
|
||||
announce_destination.hash = packet.destination_hash
|
||||
announce_destination.hexhash = announce_destination.hash.hex()
|
||||
announce_context = RNS.Packet.NONE
|
||||
announce_data = packet.data
|
||||
|
||||
new_announce = RNS.Packet(
|
||||
announce_destination,
|
||||
announce_data,
|
||||
RNS.Packet.ANNOUNCE,
|
||||
context = RNS.Packet.PATH_RESPONSE,
|
||||
header_type = RNS.Packet.HEADER_2,
|
||||
transport_type = Transport.TRANSPORT,
|
||||
transport_id = Transport.identity.hash,
|
||||
attached_interface = attached_interface,
|
||||
context_flag = packet.context_flag,
|
||||
)
|
||||
|
||||
new_announce.hops = packet.hops
|
||||
new_announce.send()
|
||||
|
||||
if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True, packet_type="announce")
|
||||
path_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet.packet_hash]
|
||||
Transport.path_table[packet.destination_hash] = path_table_entry
|
||||
RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG)
|
||||
|
||||
# If the receiving interface is a tunnel, we add the
|
||||
# announce to the tunnels table
|
||||
if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None:
|
||||
tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id]
|
||||
paths = tunnel_entry[IDX_TT_PATHS]
|
||||
paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash]
|
||||
expires = time.time() + Transport.DESTINATION_TIMEOUT
|
||||
tunnel_entry[IDX_TT_EXPIRES] = expires
|
||||
RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG)
|
||||
|
||||
# Call externally registered callbacks from apps
|
||||
# wanting to know when an announce arrives
|
||||
for handler in Transport.announce_handlers:
|
||||
try:
|
||||
# Check that the announced destination matches
|
||||
# the handlers aspect filter
|
||||
execute_callback = False
|
||||
# If we have any local clients connected, we re-
|
||||
# transmit the announce to them immediately
|
||||
if (len(Transport.local_client_interfaces)):
|
||||
announce_identity = RNS.Identity.recall(packet.destination_hash)
|
||||
if handler.aspect_filter == None:
|
||||
# If the handlers aspect filter is set to
|
||||
# None, we execute the callback in all cases
|
||||
execute_callback = True
|
||||
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
|
||||
announce_destination.hash = packet.destination_hash
|
||||
announce_destination.hexhash = announce_destination.hash.hex()
|
||||
announce_context = RNS.Packet.NONE
|
||||
announce_data = packet.data
|
||||
|
||||
# TODO: Shouldn't the context be PATH_RESPONSE in the first case here?
|
||||
if Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE:
|
||||
for local_interface in Transport.local_client_interfaces:
|
||||
if packet.receiving_interface != local_interface:
|
||||
new_announce = RNS.Packet(
|
||||
announce_destination,
|
||||
announce_data,
|
||||
RNS.Packet.ANNOUNCE,
|
||||
context = announce_context,
|
||||
header_type = RNS.Packet.HEADER_2,
|
||||
transport_type = Transport.TRANSPORT,
|
||||
transport_id = Transport.identity.hash,
|
||||
attached_interface = local_interface,
|
||||
context_flag = packet.context_flag,
|
||||
)
|
||||
|
||||
new_announce.hops = packet.hops
|
||||
new_announce.send()
|
||||
|
||||
else:
|
||||
handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity)
|
||||
if packet.destination_hash == handler_expected_hash:
|
||||
for local_interface in Transport.local_client_interfaces:
|
||||
if packet.receiving_interface != local_interface:
|
||||
new_announce = RNS.Packet(
|
||||
announce_destination,
|
||||
announce_data,
|
||||
RNS.Packet.ANNOUNCE,
|
||||
context = announce_context,
|
||||
header_type = RNS.Packet.HEADER_2,
|
||||
transport_type = Transport.TRANSPORT,
|
||||
transport_id = Transport.identity.hash,
|
||||
attached_interface = local_interface,
|
||||
context_flag = packet.context_flag,
|
||||
)
|
||||
|
||||
new_announce.hops = packet.hops
|
||||
new_announce.send()
|
||||
|
||||
# If we have any waiting discovery path requests
|
||||
# for this destination, we retransmit to that
|
||||
# interface immediately
|
||||
if packet.destination_hash in Transport.discovery_path_requests:
|
||||
pr_entry = Transport.discovery_path_requests[packet.destination_hash]
|
||||
attached_interface = pr_entry["requesting_interface"]
|
||||
|
||||
interface_str = " on "+str(attached_interface)
|
||||
|
||||
RNS.log("Got matching announce, answering waiting discovery path request for "+RNS.prettyhexrep(packet.destination_hash)+interface_str, RNS.LOG_DEBUG)
|
||||
announce_identity = RNS.Identity.recall(packet.destination_hash)
|
||||
announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown");
|
||||
announce_destination.hash = packet.destination_hash
|
||||
announce_destination.hexhash = announce_destination.hash.hex()
|
||||
announce_context = RNS.Packet.NONE
|
||||
announce_data = packet.data
|
||||
|
||||
new_announce = RNS.Packet(
|
||||
announce_destination,
|
||||
announce_data,
|
||||
RNS.Packet.ANNOUNCE,
|
||||
context = RNS.Packet.PATH_RESPONSE,
|
||||
header_type = RNS.Packet.HEADER_2,
|
||||
transport_type = Transport.TRANSPORT,
|
||||
transport_id = Transport.identity.hash,
|
||||
attached_interface = attached_interface,
|
||||
context_flag = packet.context_flag,
|
||||
)
|
||||
|
||||
new_announce.hops = packet.hops
|
||||
new_announce.send()
|
||||
|
||||
if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True, packet_type="announce")
|
||||
path_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet.packet_hash]
|
||||
Transport.path_table[packet.destination_hash] = path_table_entry
|
||||
RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG)
|
||||
|
||||
# If the receiving interface is a tunnel, we add the
|
||||
# announce to the tunnels table
|
||||
if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None:
|
||||
tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id]
|
||||
paths = tunnel_entry[IDX_TT_PATHS]
|
||||
paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash]
|
||||
expires = time.time() + Transport.DESTINATION_TIMEOUT
|
||||
tunnel_entry[IDX_TT_EXPIRES] = expires
|
||||
RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG)
|
||||
|
||||
# Call externally registered callbacks from apps
|
||||
# wanting to know when an announce arrives
|
||||
for handler in Transport.announce_handlers:
|
||||
try:
|
||||
# Check that the announced destination matches
|
||||
# the handlers aspect filter
|
||||
execute_callback = False
|
||||
announce_identity = RNS.Identity.recall(packet.destination_hash)
|
||||
if handler.aspect_filter == None:
|
||||
# If the handlers aspect filter is set to
|
||||
# None, we execute the callback in all cases
|
||||
execute_callback = True
|
||||
|
||||
# If this is a path response, check whether the
|
||||
# handler wants to receive it.
|
||||
if packet.context == RNS.Packet.PATH_RESPONSE:
|
||||
if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True:
|
||||
pass
|
||||
else:
|
||||
execute_callback = False
|
||||
handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity)
|
||||
if packet.destination_hash == handler_expected_hash:
|
||||
execute_callback = True
|
||||
|
||||
if execute_callback:
|
||||
# If this is a path response, check whether the
|
||||
# handler wants to receive it.
|
||||
if packet.context == RNS.Packet.PATH_RESPONSE:
|
||||
if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True:
|
||||
pass
|
||||
else:
|
||||
execute_callback = False
|
||||
|
||||
if len(inspect.signature(handler.received_announce).parameters) == 3:
|
||||
handler.received_announce(destination_hash=packet.destination_hash,
|
||||
announced_identity=announce_identity,
|
||||
app_data=RNS.Identity.recall_app_data(packet.destination_hash))
|
||||
|
||||
elif len(inspect.signature(handler.received_announce).parameters) == 4:
|
||||
handler.received_announce(destination_hash=packet.destination_hash,
|
||||
announced_identity=announce_identity,
|
||||
app_data=RNS.Identity.recall_app_data(packet.destination_hash),
|
||||
announce_packet_hash = packet.packet_hash)
|
||||
|
||||
elif len(inspect.signature(handler.received_announce).parameters) == 5:
|
||||
handler.received_announce(destination_hash=packet.destination_hash,
|
||||
announced_identity=announce_identity,
|
||||
app_data=RNS.Identity.recall_app_data(packet.destination_hash),
|
||||
announce_packet_hash = packet.packet_hash,
|
||||
is_path_response = packet.context == RNS.Packet.PATH_RESPONSE)
|
||||
else:
|
||||
raise TypeError("Invalid signature for announce handler callback")
|
||||
if execute_callback:
|
||||
if len(inspect.signature(handler.received_announce).parameters) == 3:
|
||||
def job():
|
||||
handler.received_announce(destination_hash=packet.destination_hash,
|
||||
announced_identity=announce_identity,
|
||||
app_data=RNS.Identity.recall_app_data(packet.destination_hash))
|
||||
threading.Thread(target=job, daemon=True).start()
|
||||
|
||||
elif len(inspect.signature(handler.received_announce).parameters) == 4:
|
||||
def job():
|
||||
handler.received_announce(destination_hash=packet.destination_hash,
|
||||
announced_identity=announce_identity,
|
||||
app_data=RNS.Identity.recall_app_data(packet.destination_hash),
|
||||
announce_packet_hash = packet.packet_hash)
|
||||
threading.Thread(target=job, daemon=True).start()
|
||||
|
||||
elif len(inspect.signature(handler.received_announce).parameters) == 5:
|
||||
def job():
|
||||
handler.received_announce(destination_hash=packet.destination_hash,
|
||||
announced_identity=announce_identity,
|
||||
app_data=RNS.Identity.recall_app_data(packet.destination_hash),
|
||||
announce_packet_hash = packet.packet_hash,
|
||||
is_path_response = packet.context == RNS.Packet.PATH_RESPONSE)
|
||||
threading.Thread(target=job, daemon=True).start()
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while processing external announce callback.", RNS.LOG_ERROR)
|
||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
else:
|
||||
raise TypeError("Invalid signature for announce handler callback")
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while processing external announce callback.", RNS.LOG_ERROR)
|
||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
# Handling for link requests to local destinations
|
||||
elif packet.packet_type == RNS.Packet.LINKREQUEST:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue