Improved auto-peering on inbound PN sync. Added support for persisting and loading transient message stamp status. Implemented getting transient message stamp value.

This commit is contained in:
Mark Qvist 2025-10-30 21:19:38 +01:00
commit c84aea745a
4 changed files with 90 additions and 70 deletions

View file

@ -149,7 +149,8 @@ def stamp_cost_from_app_data(app_data=None):
def pn_announce_data_is_valid(data):
try:
if type(data) == bytes: data = msgpack.unpackb(data)
if type(data) != bytes: return False
else: data = msgpack.unpackb(data)
if len(data) < 7: raise ValueError("Invalid announce data: Insufficient peer data")
else:
try: int(data[1])

View file

@ -259,7 +259,7 @@ class LXMPeer:
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
RNS.log(f"Syncing to peer with per-message limit {RNS.prettysize(self.propagation_transfer_limit*1000)} and sync limit {RNS.prettysize(self.propagation_sync_limit*1000)}") # TODO: Remove debug
for unhandled_entry in unhandled_entries:
transient_id = unhandled_entry[0]
weight = unhandled_entry[1]

View file

@ -15,6 +15,7 @@ import RNS.vendor.umsgpack as msgpack
from .LXMF import APP_NAME
from .LXMF import FIELD_TICKET
from .LXMF import pn_announce_data_is_valid
from .LXMPeer import LXMPeer
from .LXMessage import LXMessage
@ -285,13 +286,13 @@ class LXMRouter:
node_state = self.propagation_node and not self.from_static_only
stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility]
metadata = {}
announce_data = [ False, # Legacy LXMF PN support
int(time.time()), # Current node timebase
node_state, # Boolean flag signalling propagation node state
self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes
self.propagation_per_sync_limit, # Limit for incoming propagation node syncs
stamp_cost, # Propagation stamp cost for this node
metadata ] # Node metadata
announce_data = [ False, # 0: Legacy LXMF PN support
int(time.time()), # 1: Current node timebase
node_state, # 2: Boolean flag signalling propagation node state
self.propagation_per_transfer_limit, # 3: Per-transfer limit for message propagation in kilobytes
self.propagation_per_sync_limit, # 4: Limit for incoming propagation node syncs
stamp_cost, # 5: Propagation stamp cost for this node
metadata ] # 6: Node metadata
data = msgpack.packb(announce_data)
self.propagation_destination.announce(app_data=data)
@ -486,19 +487,21 @@ class LXMRouter:
st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE)
for filename in os.listdir(self.messagepath):
components = filename.split("_")
if len(components) == 2:
if len(components) >= 2:
if float(components[1]) > 0:
if len(components[0]) == RNS.Identity.HASHLENGTH//8*2:
try:
transient_id = bytes.fromhex(components[0])
received = float(components[1])
filepath = self.messagepath+"/"+filename
msg_size = os.path.getsize(filepath)
file = open(filepath, "rb")
destination_hash = file.read(LXMessage.DESTINATION_LENGTH)
transient_id = bytes.fromhex(components[0])
received = float(components[1])
filepath = self.messagepath+"/"+filename
msg_size = os.path.getsize(filepath)
file = open(filepath, "rb")
destination_hash = file.read(LXMessage.DESTINATION_LENGTH)
file.close()
if len(components) >= 3: stamp_value = int(components[2])
else: stamp_value = None
self.propagation_entries[transient_id] = [
destination_hash, # 0: Destination hash
filepath, # 1: Storage location
@ -506,6 +509,7 @@ class LXMRouter:
msg_size, # 3: Message size
[], # 4: Handled peers
[], # 5: Unhandled peers
stamp_value, # 6: Stamp value
]
except Exception as e:
@ -923,22 +927,26 @@ class LXMRouter:
return msgpack.packb(peer_data)
def get_weight(self, transient_id):
dst_hash = self.propagation_entries[transient_id][0]
lxm_rcvd = self.propagation_entries[transient_id][2]
def get_size(self, transient_id):
lxm_size = self.propagation_entries[transient_id][3]
return lxm_size
now = time.time()
def get_weight(self, transient_id):
dst_hash = self.propagation_entries[transient_id][0]
lxm_rcvd = self.propagation_entries[transient_id][2]
lxm_size = self.propagation_entries[transient_id][3]
now = time.time()
age_weight = max(1, (now - lxm_rcvd)/60/60/24/4)
if dst_hash in self.prioritised_list:
priority_weight = 0.1
else:
priority_weight = 1.0
if dst_hash in self.prioritised_list: priority_weight = 0.1
else: priority_weight = 1.0
weight = priority_weight * age_weight * lxm_size
return priority_weight * age_weight * lxm_size
return weight
def get_stamp_value(self, transient_id):
if not transient_id in self.propagation_entries: return None
else: return self.propagation_entries[transient_id][6]
def generate_ticket(self, destination_hash, expiry=LXMessage.TICKET_EXPIRY):
now = time.time()
@ -1003,10 +1011,6 @@ class LXMRouter:
else:
return available_tickets
def get_size(self, transient_id):
lxm_size = self.propagation_entries[transient_id][3]
return lxm_size
def clean_message_store(self):
RNS.log("Cleaning message store", RNS.LOG_VERBOSE)
# Check and remove expired messages
@ -1993,18 +1997,22 @@ class LXMRouter:
def propagation_packet(self, data, packet):
try:
if packet.destination_type != RNS.Destination.LINK:
pass
if packet.destination_type != RNS.Destination.LINK: return
else:
data = msgpack.unpackb(data)
data = msgpack.unpackb(data)
remote_timebase = data[0]
messages = data[1]
#######################################
# TODO: Check propagation stamps here #
#######################################
stamps_valid = False
messages = data[1]
for lxmf_data in messages:
self.lxmf_propagation(lxmf_data)
self.client_propagation_messages_received += 1
packet.prove()
if stamps_valid: packet.prove()
except Exception as e:
RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR)
@ -2053,34 +2061,49 @@ class LXMRouter:
if type(data) == list and len(data) == 2 and type(data[0] == float) and type(data[1]) == list:
# This is a series of propagation messages from a peer or originator
remote_timebase = data[0]
remote_hash = None
remote_str = "unknown peer"
remote_identity = resource.link.get_remote_identity()
remote_timebase = data[0]
messages = data[1]
remote_hash = None
remote_str = "unknown peer"
if remote_identity != None:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
remote_hash = remote_destination.hash
remote_str = RNS.prettyhexrep(remote_hash)
remote_hash = remote_destination.hash
remote_app_data = RNS.Identity.recall_app_data(remote_hash)
remote_str = RNS.prettyhexrep(remote_hash)
if not remote_hash in self.peers:
if self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
# TODO: Query cache for an announce and get propagation
# transfer limit from that. For now, initialise it to a
# sane default value, and wait for an announce to arrive
# that will update the peering config to the actual limit.
propagation_transfer_limit = LXMRouter.PROPAGATION_LIMIT//4
wanted_inbound_peers = None
self.peer(remote_hash, remote_timebase, propagation_transfer_limit, wanted_inbound_peers)
if remote_hash in self.peers: remote_str = f"peer {remote_str}"
else:
remote_str = f"peer {remote_str}"
if pn_announce_data_is_valid(remote_app_data):
# 1: Current node timebase
# 2: Boolean flag signalling propagation node state
# 3: Per-transfer limit for message propagation in kilobytes
# 4: Limit for incoming propagation node syncs
# 5: Propagation stamp cost for this node
# 6: Node metadata
if remote_app_data[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
remote_timebase = remote_app_data[1]
remote_transfer_limit = remote_app_data[3]
remote_sync_limit = remote_app_data[4]
remote_stamp_cost = remote_app_data[5][0]
remote_stamp_flex = remote_app_data[5][1]
remote_metadata = remote_app_data[6]
RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug
self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_metadata)
messages = data[1]
ms = "" if len(messages) == 1 else "s"
RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE)
#######################################
# TODO: Check propagation stamps here #
#######################################
for lxmf_data in messages:
peer = None
transient_id = RNS.Identity.full_hash(lxmf_data)
if remote_hash != None and remote_hash in self.peers:
peer = self.peers[remote_hash]
peer.incoming += 1
@ -2093,8 +2116,7 @@ class LXMRouter:
self.client_propagation_messages_received += 1
self.lxmf_propagation(lxmf_data, from_peer=peer)
if peer != None:
peer.queue_handled_message(transient_id)
if peer != None: peer.queue_handled_message(transient_id)
else:
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
@ -2121,10 +2143,9 @@ class LXMRouter:
if peer != from_peer:
peer.queue_unhandled_message(transient_id)
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None):
no_stamp_enforcement = False
if is_paper_message:
no_stamp_enforcement = True
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None, stamp_value=None):
if is_paper_message: no_stamp_enforcement = True
else: no_stamp_enforcement = False
try:
if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD:
@ -2150,13 +2171,13 @@ class LXMRouter:
else:
if self.propagation_node:
file_path = self.messagepath+"/"+RNS.hexrep(transient_id, delimit=False)+"_"+str(received)
msg_file = open(file_path, "wb")
msg_file.write(lxmf_data)
msg_file.close()
value_component = f"_{stamp_value}" if stamp_value and stamp_value > 0 else ""
file_path = f"{self.messagepath}/{RNS.hexrep(transient_id, delimit=False)}_{received}{value_component}"
msg_file = open(file_path, "wb")
msg_file.write(lxmf_data); msg_file.close()
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_EXTREME)
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []]
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], [], stamp_value]
self.enqueue_peer_distribution(transient_id, from_peer)
else:

View file

@ -17,12 +17,10 @@ def stamp_workblock(message_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
wb_st = time.time()
workblock = b""
for n in range(expand_rounds):
workblock += RNS.Cryptography.hkdf(
length=256,
derive_from=message_id,
salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)),
context=None,
)
workblock += RNS.Cryptography.hkdf(length=256,
derive_from=message_id,
salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)),
context=None)
wb_time = time.time() - wb_st
# RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)