From 434267784d65682b24782eed73f00666468ed02e Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 31 Oct 2025 13:53:59 +0100 Subject: [PATCH] Implemented propagation node peering key generation and peering cost signalling --- LXMF/Handlers.py | 3 + LXMF/LXMF.py | 16 ++--- LXMF/LXMPeer.py | 133 ++++++++++++++++++++++++++++++++++------- LXMF/LXMRouter.py | 126 ++++++++++++++++++++++---------------- LXMF/LXStamper.py | 31 ++++++---- LXMF/Utilities/lxmd.py | 44 +++++++++++++- 6 files changed, 257 insertions(+), 96 deletions(-) diff --git a/LXMF/Handlers.py b/LXMF/Handlers.py index fc980c3..f55cc76 100644 --- a/LXMF/Handlers.py +++ b/LXMF/Handlers.py @@ -51,6 +51,7 @@ class LXMFPropagationAnnounceHandler: propagation_sync_limit = int(data[4]) propagation_stamp_cost = int(data[5][0]) propagation_stamp_cost_flexibility = int(data[5][1]) + peering_cost = int(data[5][2]) metadata = data[6] if destination_hash in self.lxmrouter.static_peers: @@ -60,6 +61,7 @@ class LXMFPropagationAnnounceHandler: propagation_sync_limit=propagation_sync_limit, propagation_stamp_cost=propagation_stamp_cost, propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility, + peering_cost=peering_cost, metadata=metadata) else: @@ -72,6 +74,7 @@ class LXMFPropagationAnnounceHandler: propagation_sync_limit=propagation_sync_limit, propagation_stamp_cost=propagation_stamp_cost, propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility, + peering_cost=peering_cost, metadata=metadata) else: diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index b608ceb..3a20b0e 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -154,18 +154,20 @@ def pn_announce_data_is_valid(data): if len(data) < 7: raise ValueError("Invalid announce data: Insufficient peer data") else: try: int(data[1]) - except: raise ValueError("Invalid announce data: Could not decode peer timebase") + except: raise ValueError("Invalid announce data: Could not decode timebase") if data[2] != True and data[2] != False: raise ValueError("Invalid announce data: Indeterminate propagation node status") try: int(data[3]) - except: raise ValueError("Invalid announce data: Could not decode peer propagation transfer limit") + except: raise ValueError("Invalid announce data: Could not decode propagation transfer limit") try: int(data[4]) - except: raise ValueError("Invalid announce data: Could not decode peer propagation sync limit") - if type(data[4]) != list: raise ValueError("Invalid announce data: Could not decode peer stamp costs") + except: raise ValueError("Invalid announce data: Could not decode propagation sync limit") + if type(data[4]) != list: raise ValueError("Invalid announce data: Could not decode stamp costs") try: int(data[5][0]) - except: raise ValueError("Invalid announce data: Could not decode peer target stamp cost") + except: raise ValueError("Invalid announce data: Could not decode target stamp cost") try: int(data[5][1]) - except: raise ValueError("Invalid announce data: Could not decode peer stamp cost flexibility") - if type(data[6]) != dict: raise ValueError("Invalid announce data: Could not decode peer metadata") + except: raise ValueError("Invalid announce data: Could not decode stamp cost flexibility") + try: int(data[5][2]) + except: raise ValueError("Invalid announce data: Could not decode peering cost") + if type(data[6]) != dict: raise ValueError("Invalid announce data: Could not decode metadata") except Exception as e: RNS.log(f"Could not validate propagation node announce data: {e}", RNS.LOG_DEBUG) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index e767313..0fe1e74 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -1,8 +1,10 @@ import os import time +import threading import RNS import RNS.vendor.umsgpack as msgpack +import LXMF.LXStamper as LXStamper from collections import deque from .LXMF import APP_NAME @@ -20,6 +22,7 @@ class LXMPeer: ERROR_NO_IDENTITY = 0xf0 ERROR_NO_ACCESS = 0xf1 + ERROR_THROTTLED = 0xf2 ERROR_TIMEOUT = 0xfe STRATEGY_LAZY = 0x01 @@ -80,6 +83,11 @@ class LXMPeer: except: peer.propagation_stamp_cost_flexibility = None else: peer.propagation_stamp_cost_flexibility = None + if "peering_cost" in dictionary: + try: peer.peering_cost = int(dictionary["peering_cost"]) + except: peer.peering_cost = None + else: peer.peering_cost = None + if "sync_strategy" in dictionary: try: peer.sync_strategy = int(dictionary["sync_strategy"]) except: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY @@ -97,6 +105,8 @@ class LXMPeer: else: peer.tx_bytes = 0 if "last_sync_attempt" in dictionary: peer.last_sync_attempt = dictionary["last_sync_attempt"] else: peer.last_sync_attempt = 0 + if "peering_key" in dictionary: peer.peering_key = dictionary["peering_key"] + else: peer.peering_key = None hm_count = 0 for transient_id in dictionary["handled_ids"]: @@ -123,6 +133,8 @@ class LXMPeer: dictionary["peering_timebase"] = self.peering_timebase dictionary["alive"] = self.alive dictionary["last_heard"] = self.last_heard + dictionary["sync_strategy"] = self.sync_strategy + dictionary["peering_key"] = self.peering_key dictionary["destination_hash"] = self.destination_hash dictionary["link_establishment_rate"] = self.link_establishment_rate dictionary["sync_transfer_rate"] = self.sync_transfer_rate @@ -130,7 +142,7 @@ class LXMPeer: dictionary["propagation_sync_limit"] = self.propagation_sync_limit dictionary["propagation_stamp_cost"] = self.propagation_stamp_cost dictionary["propagation_stamp_cost_flexibility"] = self.propagation_stamp_cost_flexibility - dictionary["sync_strategy"] = self.sync_strategy + dictionary["peering_cost"] = self.peering_cost dictionary["last_sync_attempt"] = self.last_sync_attempt dictionary["offered"] = self.offered dictionary["outgoing"] = self.outgoing @@ -155,16 +167,18 @@ class LXMPeer: return peer_bytes def __init__(self, router, destination_hash, sync_strategy=DEFAULT_SYNC_STRATEGY): - self.alive = False - self.last_heard = 0 + self.alive = False + self.last_heard = 0 self.sync_strategy = sync_strategy + self.peering_key = None + self.peering_cost = None - self.next_sync_attempt = 0 - self.last_sync_attempt = 0 - self.sync_backoff = 0 - self.peering_timebase = 0 + self.next_sync_attempt = 0 + self.last_sync_attempt = 0 + self.sync_backoff = 0 + self.peering_timebase = 0 self.link_establishment_rate = 0 - self.sync_transfer_rate = 0 + self.sync_transfer_rate = 0 self.propagation_transfer_limit = None self.propagation_sync_limit = None @@ -185,6 +199,8 @@ class LXMPeer: self._hm_counts_synced = False self._um_counts_synced = False + self._peering_key_lock = threading.Lock() + self.link = None self.state = LXMPeer.IDLE @@ -199,11 +215,74 @@ class LXMPeer: self.destination = None RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING) + def peering_key_ready(self): + if not self.peering_cost: return False + if type(self.peering_key) == list and len(self.peering_key) == 2: + value = self.peering_key[1] + if value >= self.peering_cost: return True + else: + RNS.log(f"Peering key value mismatch for {self}. Current value is {value}, but peer requires {self.peering_cost}. Scheduling regeneration...", RNS.LOG_WARNING) + self.peering_key = None + + return False + + def peering_key_value(self): + if type(self.peering_key) == list and len(self.peering_key) == 2: return self.peering_key[1] + else: return None + + def generate_peering_key(self): + if self.peering_cost == None: return False + with self._peering_key_lock: + if self.peering_key != None: return True + else: + RNS.log(f"Generating peering key for {self}", RNS.LOG_NOTICE) + if self.router.identity == None: + RNS.log(f"Could not update peering key for {self} since the local LXMF router identity is not configured", RNS.LOG_ERROR) + return False + + if self.identity == None: + self.identity = RNS.Identity.recall(destination_hash) + if self.identity == None: + RNS.log(f"Could not update peering key for {self} since its identity could not be recalled", RNS.LOG_ERROR) + return False + + key_material = self.identity.hash+self.router.identity.hash + peering_key, value = LXStamper.generate_stamp(key_material, self.peering_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PEERING) + if value >= self.peering_cost: + self.peering_key = [peering_key, value] + RNS.log(f"Peering key successfully generated for {self}", RNS.LOG_NOTICE) + return True + + return False + def sync(self): RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) self.last_sync_attempt = time.time() - if time.time() > self.next_sync_attempt: + sync_time_reached = time.time() > self.next_sync_attempt + stamp_costs_known = self.propagation_stamp_cost != None and self.propagation_stamp_cost_flexibility != None and self.peering_cost != None + peering_key_ready = self.peering_key_ready() + sync_checks = sync_time_reached and stamp_costs_known and peering_key_ready + + if not sync_checks: + try: + if not sync_time_reached: + postpone_reason = " due to previous failures" + if self.last_sync_attempt > self.last_heard: self.alive = False + elif not stamp_costs_known: + postpone_reason = " since its required stamp costs are not yet known" + elif not peering_key_ready: + postpone_reason = " since a peering key has not been generated yet" + def job(): self.generate_peering_key() + threading.Thread(target=job, daemon=True).start() + + delay = self.next_sync_attempt-time.time() + postpone_delay = " for {RNS.prettytime({delay})}" if delay > 0 else "" + RNS.log(f"Postponing sync with peer {RNS.prettyhexrep(self.destination_hash)}{postpone_delay}{postpone_reason}", RNS.LOG_DEBUG) + except Exception as e: + RNS.trace_exception(e) + + else: if not RNS.Transport.has_path(self.destination_hash): RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG) RNS.Transport.request_path(self.destination_hash) @@ -219,6 +298,10 @@ class LXMPeer: self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") if self.destination != None: + if len(self.unhandled_messages) == 0: + RNS.log(f"Sync requested for {self}, but no unhandled messages exist for peer. Sync complete.", RNS.LOG_DEBUG) + return + if len(self.unhandled_messages) > 0: if self.currently_transferring_messages != None: RNS.log(f"Sync requested for {self}, but current message transfer index was not clear. Aborting.", RNS.LOG_ERROR) @@ -236,23 +319,31 @@ class LXMPeer: self.alive = True self.last_heard = time.time() self.sync_backoff = 0 + min_accepted_cost = min(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) - RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG) + RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing sync offer...", RNS.LOG_DEBUG) unhandled_entries = [] - unhandled_ids = [] - purged_ids = [] + unhandled_ids = [] + purged_ids = [] + low_value_ids = [] for transient_id in self.unhandled_messages: if transient_id in self.router.propagation_entries: - unhandled_entry = [ transient_id, - self.router.get_weight(transient_id), - self.router.get_size(transient_id) ] - - unhandled_entries.append(unhandled_entry) + if self.router.get_stamp_value(transient_id) < min_accepted_cost: low_value_ids.append(transient_id) + else: + unhandled_entry = [ transient_id, + self.router.get_weight(transient_id), + self.router.get_size(transient_id) ] + + unhandled_entries.append(unhandled_entry) else: purged_ids.append(transient_id) for transient_id in purged_ids: - RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) + RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since it no longer exists in the message store.", RNS.LOG_DEBUG) + self.remove_unhandled_message(transient_id) + + for transient_id in low_value_ids: + RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since its stamp value is lower than peer requirement of {min_accepted_cost}.", RNS.LOG_DEBUG) self.remove_unhandled_message(transient_id) unhandled_entries.sort(key=lambda e: e[1], reverse=False) @@ -284,11 +375,7 @@ class LXMPeer: self.state = LXMPeer.REQUEST_SENT else: - RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) - - else: - RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG) - if self.last_sync_attempt > self.last_heard: self.alive = False + RNS.log(f"Could not request sync to peer {RNS.prettyhexrep(self.destination_hash)} since its identity could not be recalled.", RNS.LOG_ERROR) def request_failed(self, request_receipt): RNS.log(f"Sync request to peer {self.destination} failed", RNS.LOG_DEBUG) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index d4108fb..ce6b685 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -45,9 +45,11 @@ class LXMRouter: ROTATION_HEADROOM_PCT = 10 ROTATION_AR_MAX = 0.5 - PROPAGATION_COST = 12 - PROPAGATION_COST_MIN = 10 + PEERING_COST = 10 + MAX_PEERING_COST = 12 + PROPAGATION_COST_MIN = 13 PROPAGATION_COST_FLEX = 3 + PROPAGATION_COST = 16 PROPAGATION_LIMIT = 256 SYNC_LIMIT = PROPAGATION_LIMIT*40 DELIVERY_LIMIT = 1000 @@ -81,7 +83,8 @@ class LXMRouter: propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, sync_limit=SYNC_LIMIT, enforce_ratchets=False, enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT, - propagation_cost=PROPAGATION_COST, propagation_cost_flexibility=PROPAGATION_COST_FLEX): + propagation_cost=PROPAGATION_COST, propagation_cost_flexibility=PROPAGATION_COST_FLEX, + peering_cost=PEERING_COST): random.seed(os.urandom(10)) @@ -115,17 +118,20 @@ class LXMRouter: self.outbound_propagation_link = None if delivery_limit == None: delivery_limit = LXMRouter.DELIVERY_LIMIT + if propagation_cost < LXMRouter.PROPAGATION_COST_MIN: propagation_cost = LXMRouter.PROPAGATION_COST_MIN - self.message_storage_limit = None - self.information_storage_limit = None - self.propagation_per_transfer_limit = propagation_limit - self.propagation_per_sync_limit = sync_limit - self.delivery_per_transfer_limit = delivery_limit - self.propagation_stamp_cost = propagation_cost + self.message_storage_limit = None + self.information_storage_limit = None + self.propagation_per_transfer_limit = propagation_limit + self.propagation_per_sync_limit = sync_limit + self.delivery_per_transfer_limit = delivery_limit + self.propagation_stamp_cost = propagation_cost self.propagation_stamp_cost_flexibility = propagation_cost_flexibility - self.enforce_ratchets = enforce_ratchets - self._enforce_stamps = enforce_stamps - self.pending_deferred_stamps = {} + self.peering_cost = peering_cost + self.max_peering_cost = LXMRouter.MAX_PEERING_COST + self.enforce_ratchets = enforce_ratchets + self._enforce_stamps = enforce_stamps + self.pending_deferred_stamps = {} if sync_limit == None or self.propagation_per_sync_limit < self.propagation_per_transfer_limit: self.propagation_per_sync_limit = self.propagation_per_transfer_limit @@ -284,7 +290,7 @@ class LXMRouter: def delayed_announce(): time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) node_state = self.propagation_node and not self.from_static_only - stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility] + stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility, self.peering_cost] metadata = {} announce_data = [ False, # 0: Legacy LXMF PN support int(time.time()), # 1: Current node timebase @@ -719,6 +725,8 @@ class LXMRouter: "sync_limit": peer.propagation_sync_limit, "target_stamp_cost": peer.propagation_stamp_cost, "stamp_cost_flexibility": peer.propagation_stamp_cost_flexibility, + "peering_cost": peer.peering_cost, + "peering_key": peer.peering_key_value(), "network_distance": RNS.Transport.hops_to(peer_id), "rx_bytes": peer.rx_bytes, "tx_bytes": peer.tx_bytes, @@ -739,6 +747,8 @@ class LXMRouter: "sync_limit": self.propagation_per_sync_limit, "target_stamp_cost": self.propagation_stamp_cost, "stamp_cost_flexibility": self.propagation_stamp_cost_flexibility, + "peering_cost": self.peering_cost, + "max_peering_cost": self.max_peering_cost, "autopeer_maxdepth": self.autopeer_maxdepth, "from_static_only": self.from_static_only, "messagestore": { @@ -1782,39 +1792,48 @@ class LXMRouter: ### Peer Sync & Propagation ########################### ####################################################### - def peer(self, destination_hash, timestamp, propagation_transfer_limit, propagation_sync_limit, propagation_stamp_cost, propagation_stamp_cost_flexibility): - if destination_hash in self.peers: - peer = self.peers[destination_hash] - if timestamp > peer.peering_timebase: - peer.alive = True - peer.sync_backoff = 0 - peer.next_sync_attempt = 0 - peer.peering_timebase = timestamp - peer.last_heard = time.time() - peer.propagation_stamp_cost = propagation_stamp_cost - peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility - peer.propagation_transfer_limit = propagation_transfer_limit - if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit - else: peer.propagation_sync_limit = propagation_transfer_limit - - RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE) - - else: - if len(self.peers) < self.max_peers: - peer = LXMPeer(self, destination_hash, sync_strategy=self.default_sync_strategy) - peer.alive = True - peer.last_heard = time.time() - peer.propagation_stamp_cost = propagation_stamp_cost - peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility - peer.propagation_transfer_limit = propagation_transfer_limit - if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit - else: peer.propagation_sync_limit = propagation_transfer_limit - - self.peers[destination_hash] = peer - RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE) - + def peer(self, destination_hash, timestamp, propagation_transfer_limit, propagation_sync_limit, propagation_stamp_cost, propagation_stamp_cost_flexibility, peering_cost, metadata): + if peering_cost > self.max_peering_cost: + if destination_hash in self.peers: + RNS.log(f"Peer {RNS.prettyhexrep(destination_hash)} increased peering cost beyond local accepted maximum, breaking peering...", RNS.LOG_NOTICE) + self.unpeer(destination_hash, timestamp) else: - RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG) + RNS.log(f"Not peering with {RNS.prettyhexrep(destination_hash)}, since its peering cost of {peering_cost} exceeds local maximum of {self.max_peering_cost}", RNS.LOG_NOTICE) + + else: + if destination_hash in self.peers: + peer = self.peers[destination_hash] + if timestamp > peer.peering_timebase: + peer.alive = True + peer.sync_backoff = 0 + peer.next_sync_attempt = 0 + peer.peering_timebase = timestamp + peer.last_heard = time.time() + peer.propagation_stamp_cost = propagation_stamp_cost + peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility + peer.peering_cost = peering_cost + peer.propagation_transfer_limit = propagation_transfer_limit + if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit + else: peer.propagation_sync_limit = propagation_transfer_limit + + RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE) + + else: + if len(self.peers) >= self.max_peers: RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG) + else: + peer = LXMPeer(self, destination_hash, sync_strategy=self.default_sync_strategy) + peer.alive = True + peer.last_heard = time.time() + peer.propagation_stamp_cost = propagation_stamp_cost + peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility + peer.peering_cost = peering_cost + peer.propagation_transfer_limit = propagation_transfer_limit + if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit + else: peer.propagation_sync_limit = propagation_transfer_limit + + self.peers[destination_hash] = peer + RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE) + def unpeer(self, destination_hash, timestamp = None): if timestamp == None: @@ -2000,8 +2019,8 @@ class LXMRouter: ####################################### # TODO: Check propagation stamps here # ####################################### - target_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) - validated_messages = LXStamper.validate_pn_stamps(messages, target_cost) + min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) + validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost) for validated_entry in validated_messages: lxmf_data = validated_entry[1] @@ -2077,7 +2096,7 @@ class LXMRouter: # 2: Boolean flag signalling propagation node state # 3: Per-transfer limit for message propagation in kilobytes # 4: Limit for incoming propagation node syncs - # 5: Propagation stamp cost for this node + # 5: Propagation stamp costs for this node # 6: Node metadata if remote_app_data[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: remote_timebase = remote_app_data[1] @@ -2085,10 +2104,11 @@ class LXMRouter: remote_sync_limit = remote_app_data[4] remote_stamp_cost = remote_app_data[5][0] remote_stamp_flex = remote_app_data[5][1] + remote_peering_cost = remote_app_data[5][2] remote_metadata = remote_app_data[6] RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug - self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_metadata) + self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata) ms = "" if len(messages) == 1 else "s" RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE) @@ -2096,8 +2116,8 @@ class LXMRouter: ####################################### # TODO: Check propagation stamps here # ####################################### - target_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) - validated_messages = LXStamper.validate_pn_stamps(messages, target_cost) + min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) + validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost) for validated_entry in validated_messages: transient_id = validated_entry[0] @@ -2177,13 +2197,13 @@ class LXMRouter: msg_file = open(file_path, "wb") msg_file.write(lxmf_data); msg_file.close() - RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_EXTREME) + RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)} with stamp value {stamp_value}, adding to peer distribution queues...", RNS.LOG_EXTREME) self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], [], stamp_value] self.enqueue_peer_distribution(transient_id, from_peer) else: # TODO: Add message to sneakernet queues when implemented - RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG) + RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)}, but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG) return True diff --git a/LXMF/LXStamper.py b/LXMF/LXStamper.py index 9d85329..4d2e38c 100644 --- a/LXMF/LXStamper.py +++ b/LXMF/LXStamper.py @@ -7,23 +7,24 @@ import math import itertools import multiprocessing -WORKBLOCK_EXPAND_ROUNDS = 3000 -WORKBLOCK_EXPAND_ROUNDS_PN = 1000 -STAMP_SIZE = RNS.Identity.HASHLENGTH -PN_VALIDATION_POOL_MIN_SIZE = 256 +WORKBLOCK_EXPAND_ROUNDS = 3000 +WORKBLOCK_EXPAND_ROUNDS_PEERING = 20000 +WORKBLOCK_EXPAND_ROUNDS_PN = 1000 +STAMP_SIZE = RNS.Identity.HASHLENGTH +PN_VALIDATION_POOL_MIN_SIZE = 256 active_jobs = {} -def stamp_workblock(message_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): +def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): wb_st = time.time() workblock = b"" for n in range(expand_rounds): workblock += RNS.Cryptography.hkdf(length=256, - derive_from=message_id, - salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)), + derive_from=material, + salt=RNS.Identity.full_hash(material+msgpack.packb(n)), context=None) wb_time = time.time() - wb_st - # RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG) + RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG) return workblock @@ -81,9 +82,9 @@ def validate_pn_stamps(transient_list, target_cost): if len(transient_list) <= PN_VALIDATION_POOL_MIN_SIZE or non_mp_platform: return validate_pn_stamps_job_simple(transient_list, target_cost) else: return validate_pn_stamps_job_multip(transient_list, target_cost) -def generate_stamp(message_id, stamp_cost): +def generate_stamp(message_id, stamp_cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): RNS.log(f"Generating stamp with cost {stamp_cost} for {RNS.prettyhexrep(message_id)}...", RNS.LOG_DEBUG) - workblock = stamp_workblock(message_id) + workblock = stamp_workblock(message_id, expand_rounds=expand_rounds) start_time = time.time() stamp = None @@ -362,4 +363,12 @@ if __name__ == "__main__": RNS.loglevel = RNS.LOG_DEBUG RNS.log("Testing LXMF stamp generation", RNS.LOG_DEBUG) message_id = os.urandom(32) - generate_stamp(message_id, cost) \ No newline at end of file + generate_stamp(message_id, cost) + + RNS.log("Testing propagation stamp generation", RNS.LOG_DEBUG) + message_id = os.urandom(32) + generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN) + + RNS.log("Testing peering key generation", RNS.LOG_DEBUG) + message_id = os.urandom(32) + generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING) \ No newline at end of file diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index 03d1282..b2bc302 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -164,6 +164,20 @@ def apply_config(): else: active_configuration["propagation_stamp_cost_flexibility"] = LXMF.LXMRouter.PROPAGATION_COST_FLEX + if "propagation" in lxmd_config and "peering_cost" in lxmd_config["propagation"]: + active_configuration["peering_cost"] = lxmd_config["propagation"].as_int("peering_cost") + if active_configuration["peering_cost"] < 0: + active_configuration["peering_cost"] = 0 + else: + active_configuration["peering_cost"] = LXMF.LXMRouter.PEERING_COST + + if "propagation" in lxmd_config and "remote_peering_cost_max" in lxmd_config["propagation"]: + active_configuration["remote_peering_cost_max"] = lxmd_config["propagation"].as_int("remote_peering_cost_max") + if active_configuration["remote_peering_cost_max"] < 0: + active_configuration["remote_peering_cost_max"] = 0 + else: + active_configuration["remote_peering_cost_max"] = LXMF.LXMRouter.MAX_PEERING_COST + if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]: active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations") else: @@ -579,9 +593,11 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"] cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"] psc = s["target_stamp_cost"]; scf = s["stamp_cost_flexibility"] + pc = s["peering_cost"]; pcm = s["max_peering_cost"] print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})") print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit") print(f"Required propagation stamp cost is {psc}, flexibility is {scf}") + print(f"Peering cost is {pc}, max remote peering cost is {pcm}") print(f"") print(f"Peers : {stp} total (peer limit is {smp})") print(f" {sdp} discovered, {ssp} static") @@ -613,7 +629,13 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = h = max(time.time()-p["last_heard"], 0) hops = p["network_distance"] hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away" - pm = p["messages"] + pm = p["messages"]; pk = p["peering_key"] + pc = p["peering_cost"]; psc = p["target_stamp_cost"]; psf = p["stamp_cost_flexibility"] + if pc == None: pc = "unknown" + if psc == None: psc = "unknown" + if psf == None: psf = "unknown" + if pk == None: pk = "Not generated" + else: pk = f"Generated, value is {pk}" if p["last_sync_attempt"] != 0: lsa = p["last_sync_attempt"] ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago" @@ -622,9 +644,11 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness = sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000) srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"] - pmi = pm["incoming"]; pmuh = pm["unhandled"] + pmi = pm["incoming"]; pmuh = pm["unhandled"] print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}") print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago") + print(f"{ind*2}Costs : Propagation {psc} (flex {psf}), peering {pc}") + print(f"{ind*2}Sync key : {pk}") print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit") print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming") print(f"{ind*2}Traffic : {srxb} received, {stxb} sent") @@ -752,6 +776,22 @@ autopeer_maxdepth = 4 # propagation_stamp_cost_flexibility = 3 +# The peering_cost option configures the target +# value required for a remote node to peer with +# and deliver messages to this node. + +# peering_cost = 10 + +# You can configure the maximum peering cost +# of remote nodes that this node will peer with. +# Setting this to a higher number will allow +# this node to peer with other nodes requiring +# a high peering key value, but will require +# more computation time during initial peering +# when generating the peering key. + +# remote_peering_cost_max = 12 + # You can tell the LXMF message router to # prioritise storage for one or more # destinations. If the message store reaches