diff --git a/LXMF/Handlers.py b/LXMF/Handlers.py index 571a59f..c09bf1c 100644 --- a/LXMF/Handlers.py +++ b/LXMF/Handlers.py @@ -37,7 +37,10 @@ class LXMFPropagationAnnounceHandler: node_timebase = data[1] propagation_transfer_limit = None if len(data) >= 3: - propagation_transfer_limit = data[2] + try: + propagation_transfer_limit = float(data[2]) + except: + propagation_transfer_limit = None if data[0] == True: if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index 97fb747..5ee8986 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -48,6 +48,14 @@ class LXMPeer: else: peer.link_establishment_rate = 0 + if "propagation_transfer_limit" in dictionary: + try: + peer.propagation_transfer_limit = float(dictionary["propagation_transfer_limit"]) + except Exception as e: + peer.propagation_transfer_limit = None + else: + peer.propagation_transfer_limit = None + for transient_id in dictionary["handled_ids"]: if transient_id in router.propagation_entries: peer.handled_messages[transient_id] = router.propagation_entries[transient_id] @@ -65,6 +73,7 @@ class LXMPeer: dictionary["last_heard"] = self.last_heard dictionary["destination_hash"] = self.destination_hash dictionary["link_establishment_rate"] = self.link_establishment_rate + dictionary["propagation_transfer_limit"] = self.propagation_transfer_limit handled_ids = [] for transient_id in self.handled_messages: @@ -87,12 +96,14 @@ class LXMPeer: self.sync_backoff = 0 self.peering_timebase = 0 self.link_establishment_rate = 0 + self.propagation_transfer_limit = None self.link = None self.state = LXMPeer.IDLE self.unhandled_messages = {} self.handled_messages = {} + self.last_offer = [] self.router = router self.destination_hash = destination_hash @@ -133,11 +144,17 @@ class LXMPeer: self.sync_backoff = 0 RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG) + unhandled_entries = [] unhandled_ids = [] purged_ids = [] for transient_id in self.unhandled_messages: if transient_id in self.router.propagation_entries: - unhandled_ids.append(transient_id) + unhandled_entry = [ + transient_id, + self.router.get_weight(transient_id), + self.router.get_size(transient_id), + ] + unhandled_entries.append(unhandled_entry) else: purged_ids.append(transient_id) @@ -145,8 +162,21 @@ class LXMPeer: RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) self.unhandled_messages.pop(transient_id) + unhandled_entries.sort(key=lambda e: e[1], reverse=False) + cumulative_size = 0 + for unhandled_entry in unhandled_entries: + transient_id = unhandled_entry[0] + weight = unhandled_entry[1] + lxm_size = unhandled_entry[2] + if self.propagation_transfer_limit != None and cumulative_size + lxm_size > (self.propagation_transfer_limit*1000): + pass + else: + cumulative_size += lxm_size + unhandled_ids.append(transient_id) + RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) - self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) + self.last_offer = unhandled_ids + self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed) self.state = LXMPeer.REQUEST_SENT else: @@ -175,33 +205,31 @@ class LXMPeer: if response == LXMPeer.ERROR_NO_IDENTITY: if self.link != None: RNS.log("Remote peer indicated that no identification was received, retrying...", RNS.LOG_DEBUG) - self.link.indentify() + self.link.identify() self.state = LXMPeer.LINK_READY self.sync() elif response == False: # Peer already has all advertised messages - for transient_id in self.unhandled_messages: - message_entry = self.unhandled_messages[transient_id] - self.handled_messages[transient_id] = message_entry - - self.unhandled_messages = {} + for transient_id in self.last_offer: + if transient_id in self.unhandled_messages: + self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id) + elif response == True: # Peer wants all advertised messages - for transient_id in self.unhandled_messages: + for transient_id in self.last_offer: wanted_messages.append(self.unhandled_messages[transient_id]) wanted_message_ids.append(transient_id) else: # Peer wants some advertised messages - peer_had_messages = [] - for transient_id in self.unhandled_messages.copy(): + for transient_id in self.last_offer.copy(): # If the peer did not want the message, it has # already received it from another peer. if not transient_id in response: - message_entry = self.unhandled_messages.pop(transient_id) - self.handled_messages[transient_id] = message_entry + if transient_id in self.unhandled_messages: + self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id) for transient_id in response: wanted_messages.append(self.unhandled_messages[transient_id]) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 04263a0..5141bc6 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -31,30 +31,35 @@ class LXMRouter: AUTOPEER_MAXDEPTH = 4 FASTEST_N_RANDOM_POOL = 2 - PR_PATH_TIMEOUT = 10 + PROPAGATION_LIMIT = 256 + DELIVERY_LIMIT = 1024 - PR_IDLE = 0x00 - PR_PATH_REQUESTED = 0x01 - PR_LINK_ESTABLISHING = 0x02 - PR_LINK_ESTABLISHED = 0x03 - PR_REQUEST_SENT = 0x04 - PR_RECEIVING = 0x05 - PR_RESPONSE_RECEIVED = 0x06 - PR_COMPLETE = 0x07 - PR_NO_PATH = 0xf0 - PR_LINK_FAILED = 0xf1 - PR_TRANSFER_FAILED = 0xf2 - PR_NO_IDENTITY_RCVD = 0xf3 - PR_NO_ACCESS = 0xf4 - PR_FAILED = 0xfe + PR_PATH_TIMEOUT = 10 - PR_ALL_MESSAGES = 0x00 + PR_IDLE = 0x00 + PR_PATH_REQUESTED = 0x01 + PR_LINK_ESTABLISHING = 0x02 + PR_LINK_ESTABLISHED = 0x03 + PR_REQUEST_SENT = 0x04 + PR_RECEIVING = 0x05 + PR_RESPONSE_RECEIVED = 0x06 + PR_COMPLETE = 0x07 + PR_NO_PATH = 0xf0 + PR_LINK_FAILED = 0xf1 + PR_TRANSFER_FAILED = 0xf2 + PR_NO_IDENTITY_RCVD = 0xf3 + PR_NO_ACCESS = 0xf4 + PR_FAILED = 0xfe + + PR_ALL_MESSAGES = 0x00 ### Developer-facing API ############################## ####################################################### - def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = None): + def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = None, + propagation_limit = PROPAGATION_LIMIT, delivery_limit = DELIVERY_LIMIT): + random.seed(os.urandom(10)) self.pending_inbound = [] @@ -84,6 +89,8 @@ class LXMRouter: self.message_storage_limit = None self.information_storage_limit = None + self.propagation_per_transfer_limit = propagation_limit + self.delivery_per_transfer_limit = delivery_limit self.wants_download_on_path_available_from = None self.wants_download_on_path_available_to = None @@ -152,7 +159,13 @@ class LXMRouter: def announce_propagation_node(self): def delayed_announce(): time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) - data = msgpack.packb([self.propagation_node, int(time.time())]) + announce_data = [ + self.propagation_node, # Boolean flag signalling propagation node state + int(time.time()), # Current node timebase + self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes + ] + + data = msgpack.packb(announce_data) self.propagation_destination.announce(app_data=data) da_thread = threading.Thread(target=delayed_announce) @@ -319,7 +332,10 @@ class LXMRouter: peer = LXMPeer.from_bytes(serialised_peer, self) if peer.identity != None: self.peers[peer.destination_hash] = peer - RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages", RNS.LOG_DEBUG) + lim_str = ", no transfer limit" + if peer.propagation_transfer_limit != None: + lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit" + RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages"+lim_str, RNS.LOG_DEBUG) else: RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG) @@ -522,6 +538,28 @@ class LXMRouter: self.locally_processed_transient_ids.pop(transient_id) RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from locally processed cache", RNS.LOG_DEBUG) + def get_weight(self, transient_id): + dst_hash = self.propagation_entries[transient_id][0] + lxm_rcvd = self.propagation_entries[transient_id][2] + lxm_size = self.propagation_entries[transient_id][3] + + now = time.time() + age_weight = max(1, (now - lxm_rcvd)/60/60/24/4) + + if dst_hash in self.prioritised_list: + priority_weight = 0.1 + else: + priority_weight = 1.0 + + weight = priority_weight * age_weight * lxm_size + + return weight + + def get_size(self, transient_id): + lxm_size = self.propagation_entries[transient_id][3] + return lxm_size + + def clean_message_store(self): # Check and remove expired messages now = time.time() @@ -563,22 +601,13 @@ class LXMRouter: bytes_needed = message_storage_size - self.message_storage_limit bytes_cleaned = 0 - now = time.time() weighted_entries = [] for transient_id in self.propagation_entries: - entry = self.propagation_entries[transient_id] - - dst_hash = entry[0] - lxm_rcvd = entry[2] - lxm_size = entry[3] - age_weight = max(1, (now - lxm_rcvd)/60/60/24/4) - if dst_hash in self.prioritised_list: - priority_weight = 0.1 - else: - priority_weight = 1.0 - - weight = priority_weight * age_weight * lxm_size - weighted_entries.append([entry, weight, transient_id]) + weighted_entries.append([ + self.propagation_entries[transient_id], + self.get_weight(transient_id), + transient_id + ]) weighted_entries.sort(key=lambda we: we[1], reverse=True) @@ -961,7 +990,7 @@ class LXMRouter: ### Peer Sync & Propagation ########################### ####################################################### - def peer(self, destination_hash, timestamp): + def peer(self, destination_hash, timestamp, propagation_transfer_limit): if destination_hash in self.peers: peer = self.peers[destination_hash] if timestamp > peer.peering_timebase: @@ -970,11 +999,13 @@ class LXMRouter: peer.next_sync_attempt = 0 peer.peering_timebase = timestamp peer.last_heard = time.time() + peer.propagation_transfer_limit = propagation_transfer_limit else: peer = LXMPeer(self, destination_hash) peer.alive = True peer.last_heard = time.time() + peer.propagation_transfer_limit = propagation_transfer_limit self.peers[destination_hash] = peer RNS.log("Peered with "+str(peer.destination)) diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index 3dc34df..3a76dbb 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -77,6 +77,13 @@ def apply_config(): active_configuration["peer_announce_interval"] = lxmd_config["lxmf"].as_int("announce_interval")*60 else: active_configuration["peer_announce_interval"] = None + + if "lxmf" in lxmd_config and "delivery_transfer_max_accepted_size" in lxmd_config["lxmf"]: + active_configuration["delivery_transfer_max_accepted_size"] = lxmd_config["lxmf"].as_float("delivery_transfer_max_accepted_size") + if active_configuration["delivery_transfer_max_accepted_size"] < 0.38: + active_configuration["delivery_transfer_max_accepted_size"] = 0.38 + else: + active_configuration["delivery_transfer_max_accepted_size"] = 1024 if "lxmf" in lxmd_config and "on_inbound" in lxmd_config["lxmf"]: active_configuration["on_inbound"] = lxmd_config["lxmf"]["on_inbound"] @@ -121,6 +128,13 @@ def apply_config(): else: active_configuration["message_storage_limit"] = 2000 + if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]: + active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size") + if active_configuration["propagation_transfer_max_accepted_size"] < 0.38: + active_configuration["propagation_transfer_max_accepted_size"] = 0.38 + else: + active_configuration["propagation_transfer_max_accepted_size"] = 256 + if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]: active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations") else: @@ -289,6 +303,8 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo storagepath = storagedir, autopeer = active_configuration["autopeer"], autopeer_maxdepth = active_configuration["autopeer_maxdepth"], + propagation_limit = active_configuration["propagation_transfer_max_accepted_size"], + delivery_limit = active_configuration["delivery_transfer_max_accepted_size"], ) message_router.register_delivery_callback(lxmf_delivery) @@ -418,23 +434,41 @@ __default_lxmd_config__ = """# This is an example LXM Daemon config file. [propagation] # Whether to enable propagation node + enable_node = no # Automatic announce interval in minutes. # 6 hours by default. + announce_interval = 360 # Whether to announce when the node starts. + announce_at_start = yes # Wheter to automatically peer with other # propagation nodes on the network. + autopeer = yes # The maximum peering depth (in hops) for # automatically peered nodes. + autopeer_maxdepth = 4 +# The maximum accepted transfer size per in- +# coming propagation transfer, in kilobytes. +# This also sets the upper limit for the size +# of single messages accepted onto this node. +# +# If a node wants to propagate a larger number +# of messages to this node, than what can fit +# within this limit, it will prioritise sending +# the smallest messages first, and try again +# with any remaining messages at a later point. + +propagation_transfer_max_accepted_size = 256 + # The maximum amount of storage to use for # the LXMF Propagation Node message store, # specified in megabytes. When this limit @@ -444,6 +478,7 @@ autopeer_maxdepth = 4 # new and small. Large and old messages will # be removed first. This setting is optional # and defaults to 2 gigabytes. + # message_storage_limit = 2000 # You can tell the LXMF message router to @@ -453,6 +488,7 @@ autopeer_maxdepth = 4 # keeping messages for destinations specified # with this option. This setting is optional, # and generally you do not need to use it. + # prioritise_destinations = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf # By default, any destination is allowed to @@ -461,6 +497,7 @@ autopeer_maxdepth = 4 # authentication, you must provide a list of # allowed identity hashes in the a file named # "allowed" in the lxmd config directory. + auth_required = no @@ -469,23 +506,35 @@ auth_required = no # The LXM Daemon will create an LXMF destination # that it can receive messages on. This option sets # the announced display name for this destination. + display_name = Anonymous Peer # It is possible to announce the internal LXMF # destination when the LXM Daemon starts up. + announce_at_start = no # You can also announce the delivery destination # at a specified interval. This is not enabled by # default. + # announce_interval = 360 +# The maximum accepted unpacked size for mes- +# sages received directly from other peers, +# specified in kilobytes. Messages larger than +# this will be rejected before the transfer +# begins. + +delivery_transfer_max_accepted_size = 1024 + # You can configure an external program to be run # every time a message is received. The program # will receive as an argument the full path to the # message saved as a file. The example below will # simply result in the message getting deleted as # soon as it has been received. + # on_inbound = rm @@ -499,6 +548,7 @@ announce_at_start = no # 5: Verbose logging # 6: Debug logging # 7: Extreme logging + loglevel = 4 """