mirror of
https://github.com/markqvist/Reticulum.git
synced 2026-04-27 14:20:35 +00:00
Fixed invalid ingress control burst activation and subsequent path resolution failure due to incorrect announce frequency calculation
This commit is contained in:
parent
739523d559
commit
2445d18149
9 changed files with 81 additions and 58 deletions
|
|
@ -530,6 +530,16 @@ class AutoInterface(Interface):
|
|||
spawned_interface = AutoInterfacePeer(self, addr, ifname)
|
||||
spawned_interface.OUT = self.OUT
|
||||
spawned_interface.IN = self.IN
|
||||
|
||||
spawned_interface.ingress_control = self.ingress_control
|
||||
spawned_interface.ic_max_held_announces = self.ic_max_held_announces
|
||||
spawned_interface.ic_burst_hold = self.ic_burst_hold
|
||||
spawned_interface.ic_burst_freq = self.ic_burst_freq
|
||||
spawned_interface.ic_burst_freq_new = self.ic_burst_freq_new
|
||||
spawned_interface.ic_new_time = self.ic_new_time
|
||||
spawned_interface.ic_burst_penalty = self.ic_burst_penalty
|
||||
spawned_interface.ic_held_release_interval = self.ic_held_release_interval
|
||||
|
||||
spawned_interface.parent_interface = self
|
||||
spawned_interface.bitrate = self.bitrate
|
||||
|
||||
|
|
|
|||
|
|
@ -344,6 +344,16 @@ class BackboneInterface(Interface):
|
|||
spawned_interface = BackboneClientInterface(self.owner, spawned_configuration, connected_socket=socket)
|
||||
spawned_interface.OUT = self.OUT
|
||||
spawned_interface.IN = self.IN
|
||||
|
||||
spawned_interface.ingress_control = self.ingress_control
|
||||
spawned_interface.ic_max_held_announces = self.ic_max_held_announces
|
||||
spawned_interface.ic_burst_hold = self.ic_burst_hold
|
||||
spawned_interface.ic_burst_freq = self.ic_burst_freq
|
||||
spawned_interface.ic_burst_freq_new = self.ic_burst_freq_new
|
||||
spawned_interface.ic_new_time = self.ic_new_time
|
||||
spawned_interface.ic_burst_penalty = self.ic_burst_penalty
|
||||
spawned_interface.ic_held_release_interval = self.ic_held_release_interval
|
||||
|
||||
spawned_interface.socket = socket
|
||||
spawned_interface.target_ip = socket.getpeername()[0]
|
||||
spawned_interface.target_port = str(socket.getpeername()[1])
|
||||
|
|
|
|||
|
|
@ -948,6 +948,16 @@ class I2PInterface(Interface):
|
|||
spawned_interface = I2PInterfacePeer(self, self.owner, interface_name, connected_socket=handler.request)
|
||||
spawned_interface.OUT = True
|
||||
spawned_interface.IN = True
|
||||
|
||||
spawned_interface.ingress_control = self.ingress_control
|
||||
spawned_interface.ic_max_held_announces = self.ic_max_held_announces
|
||||
spawned_interface.ic_burst_hold = self.ic_burst_hold
|
||||
spawned_interface.ic_burst_freq = self.ic_burst_freq
|
||||
spawned_interface.ic_burst_freq_new = self.ic_burst_freq_new
|
||||
spawned_interface.ic_new_time = self.ic_new_time
|
||||
spawned_interface.ic_burst_penalty = self.ic_burst_penalty
|
||||
spawned_interface.ic_held_release_interval = self.ic_held_release_interval
|
||||
|
||||
spawned_interface.parent_interface = self
|
||||
spawned_interface.online = True
|
||||
spawned_interface.bitrate = self.bitrate
|
||||
|
|
|
|||
|
|
@ -55,8 +55,8 @@ class Interface:
|
|||
|
||||
# How many samples to use for announce
|
||||
# frequency calculations
|
||||
IA_FREQ_SAMPLES = 6
|
||||
OA_FREQ_SAMPLES = 6
|
||||
IA_FREQ_SAMPLES = 12
|
||||
OA_FREQ_SAMPLES = 12
|
||||
|
||||
# Maximum amount of ingress limited announces
|
||||
# to hold at any given time.
|
||||
|
|
@ -71,6 +71,7 @@ class Interface:
|
|||
IC_BURST_HOLD = 1*60
|
||||
IC_BURST_PENALTY = 5*60
|
||||
IC_HELD_RELEASE_INTERVAL = 30
|
||||
IC_DEQUE_MIN_SAMPLE = 8
|
||||
|
||||
AUTOCONFIGURE_MTU = False
|
||||
FIXED_MTU = False
|
||||
|
|
@ -131,11 +132,9 @@ class Interface:
|
|||
self.ic_burst_activated = time.time()
|
||||
return True
|
||||
|
||||
else:
|
||||
return False
|
||||
else: return False
|
||||
|
||||
else:
|
||||
return False
|
||||
else: return False
|
||||
|
||||
def optimise_mtu(self):
|
||||
if self.AUTOCONFIGURE_MTU:
|
||||
|
|
@ -191,8 +190,7 @@ class Interface:
|
|||
RNS.log("Releasing held announce packet "+str(selected_announce_packet)+" from "+str(self), RNS.LOG_EXTREME)
|
||||
self.ic_held_release = time.time() + self.ic_held_release_interval
|
||||
self.held_announces.pop(selected_announce_packet.destination_hash)
|
||||
def release():
|
||||
RNS.Transport.inbound(selected_announce_packet.raw, selected_announce_packet.receiving_interface)
|
||||
def release(): RNS.Transport.inbound(selected_announce_packet.raw, selected_announce_packet.receiving_interface)
|
||||
threading.Thread(target=release, daemon=True).start()
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -210,38 +208,24 @@ class Interface:
|
|||
self.parent_interface.sent_announce(from_spawned=True)
|
||||
|
||||
def incoming_announce_frequency(self):
|
||||
if not len(self.ia_freq_deque) > 1:
|
||||
return 0
|
||||
n = len(self.ia_freq_deque)
|
||||
if not n > self.IC_DEQUE_MIN_SAMPLE: return 0
|
||||
else:
|
||||
dq_len = len(self.ia_freq_deque)
|
||||
delta_sum = 0
|
||||
for i in range(1,dq_len):
|
||||
delta_sum += self.ia_freq_deque[i]-self.ia_freq_deque[i-1]
|
||||
delta_sum += time.time() - self.ia_freq_deque[dq_len-1]
|
||||
|
||||
if delta_sum == 0:
|
||||
avg = 0
|
||||
else:
|
||||
avg = 1/(delta_sum/(dq_len))
|
||||
|
||||
return avg
|
||||
oldest = self.ia_freq_deque[0]
|
||||
span = time.time() - oldest
|
||||
if span <= 0: return 0
|
||||
hz = n / span
|
||||
return hz
|
||||
|
||||
def outgoing_announce_frequency(self):
|
||||
if not len(self.oa_freq_deque) > 1:
|
||||
return 0
|
||||
n = len(self.oa_freq_deque)
|
||||
if not len(self.oa_freq_deque) > 1: return 0
|
||||
else:
|
||||
dq_len = len(self.oa_freq_deque)
|
||||
delta_sum = 0
|
||||
for i in range(1,dq_len):
|
||||
delta_sum += self.oa_freq_deque[i]-self.oa_freq_deque[i-1]
|
||||
delta_sum += time.time() - self.oa_freq_deque[dq_len-1]
|
||||
|
||||
if delta_sum == 0:
|
||||
avg = 0
|
||||
else:
|
||||
avg = 1/(delta_sum/(dq_len))
|
||||
|
||||
return avg
|
||||
oldest = self.oa_freq_deque[0]
|
||||
span = time.time() - oldest
|
||||
if span <= 0: return 0
|
||||
hz = n / span
|
||||
return hz
|
||||
|
||||
def process_announce_queue(self):
|
||||
if not hasattr(self, "announce_cap"):
|
||||
|
|
|
|||
|
|
@ -579,7 +579,16 @@ class TCPServerInterface(Interface):
|
|||
spawned_interface = TCPClientInterface(self.owner, spawned_configuration, connected_socket=handler.request)
|
||||
spawned_interface.OUT = self.OUT
|
||||
spawned_interface.IN = self.IN
|
||||
|
||||
spawned_interface.ingress_control = self.ingress_control
|
||||
spawned_interface.ic_max_held_announces = self.ic_max_held_announces
|
||||
spawned_interface.ic_burst_hold = self.ic_burst_hold
|
||||
spawned_interface.ic_burst_freq = self.ic_burst_freq
|
||||
spawned_interface.ic_burst_freq_new = self.ic_burst_freq_new
|
||||
spawned_interface.ic_new_time = self.ic_new_time
|
||||
spawned_interface.ic_burst_penalty = self.ic_burst_penalty
|
||||
spawned_interface.ic_held_release_interval = self.ic_held_release_interval
|
||||
|
||||
spawned_interface.target_ip = handler.client_address[0]
|
||||
spawned_interface.target_port = str(handler.client_address[1])
|
||||
spawned_interface.parent_interface = self
|
||||
|
|
|
|||
|
|
@ -942,6 +942,16 @@ class WeaveInterface(Interface):
|
|||
spawned_interface = WeaveInterfacePeer(self, endpoint_addr)
|
||||
spawned_interface.OUT = self.OUT
|
||||
spawned_interface.IN = self.IN
|
||||
|
||||
spawned_interface.ingress_control = self.ingress_control
|
||||
spawned_interface.ic_max_held_announces = self.ic_max_held_announces
|
||||
spawned_interface.ic_burst_hold = self.ic_burst_hold
|
||||
spawned_interface.ic_burst_freq = self.ic_burst_freq
|
||||
spawned_interface.ic_burst_freq_new = self.ic_burst_freq_new
|
||||
spawned_interface.ic_new_time = self.ic_new_time
|
||||
spawned_interface.ic_burst_penalty = self.ic_burst_penalty
|
||||
spawned_interface.ic_held_release_interval = self.ic_held_release_interval
|
||||
|
||||
spawned_interface.parent_interface = self
|
||||
spawned_interface.bitrate = self.bitrate
|
||||
|
||||
|
|
|
|||
30
RNS/Link.py
30
RNS/Link.py
|
|
@ -1300,10 +1300,8 @@ class Link:
|
|||
:param resource_strategy: One of ``RNS.Link.ACCEPT_NONE``, ``RNS.Link.ACCEPT_ALL`` or ``RNS.Link.ACCEPT_APP``. If ``RNS.Link.ACCEPT_APP`` is set, the `resource_callback` will be called to determine whether the resource should be accepted or not.
|
||||
:raises: *TypeError* if the resource strategy is unsupported.
|
||||
"""
|
||||
if not resource_strategy in Link.resource_strategies:
|
||||
raise TypeError("Unsupported resource strategy")
|
||||
else:
|
||||
self.resource_strategy = resource_strategy
|
||||
if not resource_strategy in Link.resource_strategies: raise TypeError("Unsupported resource strategy")
|
||||
else: self.resource_strategy = resource_strategy
|
||||
|
||||
def register_outgoing_resource(self, resource):
|
||||
self.outgoing_resources.append(resource)
|
||||
|
|
@ -1313,8 +1311,7 @@ class Link:
|
|||
|
||||
def has_incoming_resource(self, resource):
|
||||
for incoming_resource in self.incoming_resources:
|
||||
if incoming_resource.hash == resource.hash:
|
||||
return True
|
||||
if incoming_resource.hash == resource.hash: return True
|
||||
|
||||
return False
|
||||
|
||||
|
|
@ -1325,25 +1322,18 @@ class Link:
|
|||
return self.last_resource_eifr
|
||||
|
||||
def cancel_outgoing_resource(self, resource):
|
||||
if resource in self.outgoing_resources:
|
||||
self.outgoing_resources.remove(resource)
|
||||
else:
|
||||
RNS.log("Attempt to cancel a non-existing outgoing resource", RNS.LOG_ERROR)
|
||||
if resource in self.outgoing_resources: self.outgoing_resources.remove(resource)
|
||||
else: RNS.log("Attempt to cancel a non-existing outgoing resource", RNS.LOG_ERROR)
|
||||
|
||||
def cancel_incoming_resource(self, resource):
|
||||
if resource in self.incoming_resources:
|
||||
self.incoming_resources.remove(resource)
|
||||
else:
|
||||
RNS.log("Attempt to cancel a non-existing incoming resource", RNS.LOG_ERROR)
|
||||
if resource in self.incoming_resources: self.incoming_resources.remove(resource)
|
||||
else: RNS.log("Attempt to cancel a non-existing incoming resource", RNS.LOG_ERROR)
|
||||
|
||||
def ready_for_new_resource(self):
|
||||
if len(self.outgoing_resources) > 0:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
if len(self.outgoing_resources) > 0: return False
|
||||
else: return True
|
||||
|
||||
def __str__(self):
|
||||
return RNS.prettyhexrep(self.link_id)
|
||||
def __str__(self): return RNS.prettyhexrep(self.link_id)
|
||||
|
||||
|
||||
class RequestReceipt():
|
||||
|
|
|
|||
|
|
@ -856,8 +856,7 @@ class Transport:
|
|||
if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval:
|
||||
Transport.prioritize_interfaces()
|
||||
try:
|
||||
for interface in Transport.interfaces:
|
||||
interface.process_held_announces()
|
||||
for interface in Transport.interfaces: interface.process_held_announces()
|
||||
Transport.interface_last_jobs = time.time()
|
||||
except Exception as e:
|
||||
RNS.log(f"Error while processing held per-interface announces: {e}", RNS.LOG_WARNING)
|
||||
|
|
|
|||
|
|
@ -225,6 +225,7 @@ def prettysize(num, suffix='B'):
|
|||
return "%.2f%s%s" % (num, last_unit, suffix)
|
||||
|
||||
def prettyfrequency(hz, suffix="Hz"):
|
||||
if hz == 0: return "0 Hz"
|
||||
num = hz*1e6
|
||||
units = ["µ", "m", "", "K","M","G","T","P","E","Z"]
|
||||
last_unit = "Y"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue