From c84aea745a682e1f834ee7a6f6a5d8023792fa6d Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 30 Oct 2025 21:19:38 +0100 Subject: [PATCH] Improved auto-peering on inbound PN sync. Added support for persisting and loading transient message stamp status. Implemented getting transient message stamp value. --- LXMF/LXMF.py | 3 +- LXMF/LXMPeer.py | 2 +- LXMF/LXMRouter.py | 143 ++++++++++++++++++++++++++-------------------- LXMF/LXStamper.py | 10 ++-- 4 files changed, 89 insertions(+), 69 deletions(-) diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index 6369d69..b608ceb 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -149,7 +149,8 @@ def stamp_cost_from_app_data(app_data=None): def pn_announce_data_is_valid(data): try: - if type(data) == bytes: data = msgpack.unpackb(data) + if type(data) != bytes: return False + else: data = msgpack.unpackb(data) if len(data) < 7: raise ValueError("Invalid announce data: Insufficient peer data") else: try: int(data[1]) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index e86ae84..e767313 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -259,7 +259,7 @@ class LXMPeer: per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now cumulative_size = 24 # Initialised to highest reasonable binary structure overhead RNS.log(f"Syncing to peer with per-message limit {RNS.prettysize(self.propagation_transfer_limit*1000)} and sync limit {RNS.prettysize(self.propagation_sync_limit*1000)}") # TODO: Remove debug - + for unhandled_entry in unhandled_entries: transient_id = unhandled_entry[0] weight = unhandled_entry[1] diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 816d949..ace090a 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -15,6 +15,7 @@ import RNS.vendor.umsgpack as msgpack from .LXMF import APP_NAME from .LXMF import FIELD_TICKET +from .LXMF import pn_announce_data_is_valid from .LXMPeer import LXMPeer from .LXMessage import LXMessage @@ -285,13 +286,13 @@ class LXMRouter: node_state = self.propagation_node and not self.from_static_only stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility] metadata = {} - announce_data = [ False, # Legacy LXMF PN support - int(time.time()), # Current node timebase - node_state, # Boolean flag signalling propagation node state - self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes - self.propagation_per_sync_limit, # Limit for incoming propagation node syncs - stamp_cost, # Propagation stamp cost for this node - metadata ] # Node metadata + announce_data = [ False, # 0: Legacy LXMF PN support + int(time.time()), # 1: Current node timebase + node_state, # 2: Boolean flag signalling propagation node state + self.propagation_per_transfer_limit, # 3: Per-transfer limit for message propagation in kilobytes + self.propagation_per_sync_limit, # 4: Limit for incoming propagation node syncs + stamp_cost, # 5: Propagation stamp cost for this node + metadata ] # 6: Node metadata data = msgpack.packb(announce_data) self.propagation_destination.announce(app_data=data) @@ -486,19 +487,21 @@ class LXMRouter: st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE) for filename in os.listdir(self.messagepath): components = filename.split("_") - if len(components) == 2: + if len(components) >= 2: if float(components[1]) > 0: if len(components[0]) == RNS.Identity.HASHLENGTH//8*2: try: - transient_id = bytes.fromhex(components[0]) - received = float(components[1]) - - filepath = self.messagepath+"/"+filename - msg_size = os.path.getsize(filepath) - file = open(filepath, "rb") - destination_hash = file.read(LXMessage.DESTINATION_LENGTH) + transient_id = bytes.fromhex(components[0]) + received = float(components[1]) + filepath = self.messagepath+"/"+filename + msg_size = os.path.getsize(filepath) + file = open(filepath, "rb") + destination_hash = file.read(LXMessage.DESTINATION_LENGTH) file.close() + if len(components) >= 3: stamp_value = int(components[2]) + else: stamp_value = None + self.propagation_entries[transient_id] = [ destination_hash, # 0: Destination hash filepath, # 1: Storage location @@ -506,6 +509,7 @@ class LXMRouter: msg_size, # 3: Message size [], # 4: Handled peers [], # 5: Unhandled peers + stamp_value, # 6: Stamp value ] except Exception as e: @@ -923,22 +927,26 @@ class LXMRouter: return msgpack.packb(peer_data) - def get_weight(self, transient_id): - dst_hash = self.propagation_entries[transient_id][0] - lxm_rcvd = self.propagation_entries[transient_id][2] + def get_size(self, transient_id): lxm_size = self.propagation_entries[transient_id][3] + return lxm_size - now = time.time() + def get_weight(self, transient_id): + dst_hash = self.propagation_entries[transient_id][0] + lxm_rcvd = self.propagation_entries[transient_id][2] + lxm_size = self.propagation_entries[transient_id][3] + + now = time.time() age_weight = max(1, (now - lxm_rcvd)/60/60/24/4) - if dst_hash in self.prioritised_list: - priority_weight = 0.1 - else: - priority_weight = 1.0 + if dst_hash in self.prioritised_list: priority_weight = 0.1 + else: priority_weight = 1.0 - weight = priority_weight * age_weight * lxm_size + return priority_weight * age_weight * lxm_size - return weight + def get_stamp_value(self, transient_id): + if not transient_id in self.propagation_entries: return None + else: return self.propagation_entries[transient_id][6] def generate_ticket(self, destination_hash, expiry=LXMessage.TICKET_EXPIRY): now = time.time() @@ -1003,10 +1011,6 @@ class LXMRouter: else: return available_tickets - def get_size(self, transient_id): - lxm_size = self.propagation_entries[transient_id][3] - return lxm_size - def clean_message_store(self): RNS.log("Cleaning message store", RNS.LOG_VERBOSE) # Check and remove expired messages @@ -1993,18 +1997,22 @@ class LXMRouter: def propagation_packet(self, data, packet): try: - if packet.destination_type != RNS.Destination.LINK: - pass + if packet.destination_type != RNS.Destination.LINK: return else: - data = msgpack.unpackb(data) + data = msgpack.unpackb(data) remote_timebase = data[0] + messages = data[1] + + ####################################### + # TODO: Check propagation stamps here # + ####################################### + stamps_valid = False - messages = data[1] for lxmf_data in messages: self.lxmf_propagation(lxmf_data) self.client_propagation_messages_received += 1 - packet.prove() + if stamps_valid: packet.prove() except Exception as e: RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR) @@ -2053,34 +2061,49 @@ class LXMRouter: if type(data) == list and len(data) == 2 and type(data[0] == float) and type(data[1]) == list: # This is a series of propagation messages from a peer or originator - remote_timebase = data[0] - remote_hash = None - remote_str = "unknown peer" remote_identity = resource.link.get_remote_identity() + remote_timebase = data[0] + messages = data[1] + remote_hash = None + remote_str = "unknown peer" if remote_identity != None: remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - remote_hash = remote_destination.hash - remote_str = RNS.prettyhexrep(remote_hash) + remote_hash = remote_destination.hash + remote_app_data = RNS.Identity.recall_app_data(remote_hash) + remote_str = RNS.prettyhexrep(remote_hash) - if not remote_hash in self.peers: - if self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: - # TODO: Query cache for an announce and get propagation - # transfer limit from that. For now, initialise it to a - # sane default value, and wait for an announce to arrive - # that will update the peering config to the actual limit. - propagation_transfer_limit = LXMRouter.PROPAGATION_LIMIT//4 - wanted_inbound_peers = None - self.peer(remote_hash, remote_timebase, propagation_transfer_limit, wanted_inbound_peers) + if remote_hash in self.peers: remote_str = f"peer {remote_str}" else: - remote_str = f"peer {remote_str}" + if pn_announce_data_is_valid(remote_app_data): + # 1: Current node timebase + # 2: Boolean flag signalling propagation node state + # 3: Per-transfer limit for message propagation in kilobytes + # 4: Limit for incoming propagation node syncs + # 5: Propagation stamp cost for this node + # 6: Node metadata + if remote_app_data[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: + remote_timebase = remote_app_data[1] + remote_transfer_limit = remote_app_data[3] + remote_sync_limit = remote_app_data[4] + remote_stamp_cost = remote_app_data[5][0] + remote_stamp_flex = remote_app_data[5][1] + remote_metadata = remote_app_data[6] + + RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug + self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_metadata) - messages = data[1] ms = "" if len(messages) == 1 else "s" RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE) + + ####################################### + # TODO: Check propagation stamps here # + ####################################### + for lxmf_data in messages: peer = None transient_id = RNS.Identity.full_hash(lxmf_data) + if remote_hash != None and remote_hash in self.peers: peer = self.peers[remote_hash] peer.incoming += 1 @@ -2093,8 +2116,7 @@ class LXMRouter: self.client_propagation_messages_received += 1 self.lxmf_propagation(lxmf_data, from_peer=peer) - if peer != None: - peer.queue_handled_message(transient_id) + if peer != None: peer.queue_handled_message(transient_id) else: RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) @@ -2121,10 +2143,9 @@ class LXMRouter: if peer != from_peer: peer.queue_unhandled_message(transient_id) - def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None): - no_stamp_enforcement = False - if is_paper_message: - no_stamp_enforcement = True + def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None, stamp_value=None): + if is_paper_message: no_stamp_enforcement = True + else: no_stamp_enforcement = False try: if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD: @@ -2150,13 +2171,13 @@ class LXMRouter: else: if self.propagation_node: - file_path = self.messagepath+"/"+RNS.hexrep(transient_id, delimit=False)+"_"+str(received) - msg_file = open(file_path, "wb") - msg_file.write(lxmf_data) - msg_file.close() + value_component = f"_{stamp_value}" if stamp_value and stamp_value > 0 else "" + file_path = f"{self.messagepath}/{RNS.hexrep(transient_id, delimit=False)}_{received}{value_component}" + msg_file = open(file_path, "wb") + msg_file.write(lxmf_data); msg_file.close() RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_EXTREME) - self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []] + self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], [], stamp_value] self.enqueue_peer_distribution(transient_id, from_peer) else: diff --git a/LXMF/LXStamper.py b/LXMF/LXStamper.py index 2db0598..ecf75a3 100644 --- a/LXMF/LXStamper.py +++ b/LXMF/LXStamper.py @@ -17,12 +17,10 @@ def stamp_workblock(message_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): wb_st = time.time() workblock = b"" for n in range(expand_rounds): - workblock += RNS.Cryptography.hkdf( - length=256, - derive_from=message_id, - salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)), - context=None, - ) + workblock += RNS.Cryptography.hkdf(length=256, + derive_from=message_id, + salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)), + context=None) wb_time = time.time() - wb_st # RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)