Added ability to use remote management with configurable identities

This commit is contained in:
Laura Batalha 2025-05-19 16:44:42 +01:00
commit 9c98ffe96d
No known key found for this signature in database
GPG key ID: FF87260D8C08DA64
2 changed files with 89 additions and 52 deletions

View file

@ -74,7 +74,7 @@ class LXMRouter:
def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None,
propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, enforce_ratchets=False,
enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False):
enforce_stamps=False, static_peers = [], max_peers=None, from_static_only=False, management_identities = []):
random.seed(os.urandom(10))
@ -140,6 +140,8 @@ class LXMRouter:
identity = RNS.Identity()
self.identity = identity
self.management_identities = management_identities
self.management_identities.append(identity.hash)
self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.control_destination = None
self.client_propagation_messages_received = 0
@ -339,7 +341,7 @@ class LXMRouter:
delivery_destination.stamp_cost = stamp_cost
else:
return False
return True
return False
@ -506,7 +508,7 @@ class LXMRouter:
except Exception as e:
RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st))
RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE)
RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
@ -582,7 +584,7 @@ class LXMRouter:
self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL)
self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash])
self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=self.management_identities)
if self.message_storage_limit != None:
limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit)
@ -636,7 +638,7 @@ class LXMRouter:
self.message_storage_limit = int(limit_bytes)
else:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
except Exception as e:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
@ -669,7 +671,7 @@ class LXMRouter:
self.information_storage_limit = int(limit_bytes)
else:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
except Exception as e:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
@ -750,7 +752,7 @@ class LXMRouter:
def stats_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
elif remote_identity.hash != self.identity.hash:
elif remote_identity.hash not in self.management_identities:
return LXMPeer.ERROR_NO_ACCESS
else:
return self.compile_stats()
@ -846,7 +848,7 @@ class LXMRouter:
for link in inactive_links:
self.active_propagation_links.remove(link)
link.teardown()
except Exception as e:
RNS.log("An error occurred while cleaning inbound propagation links. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -889,7 +891,7 @@ class LXMRouter:
def update_stamp_cost(self, destination_hash, stamp_cost):
RNS.log(f"Updating outbound stamp cost for {RNS.prettyhexrep(destination_hash)} to {stamp_cost}", RNS.LOG_DEBUG)
self.outbound_stamp_costs[destination_hash] = [time.time(), stamp_cost]
def job():
self.save_outbound_stamp_costs()
threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start()
@ -902,7 +904,7 @@ class LXMRouter:
def get_announce_app_data(self, destination_hash):
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
display_name = None
if delivery_destination.display_name != None:
display_name = delivery_destination.display_name.encode("utf-8")
@ -928,7 +930,7 @@ class LXMRouter:
priority_weight = 0.1
else:
priority_weight = 1.0
weight = priority_weight * age_weight * lxm_size
return weight
@ -950,7 +952,7 @@ class LXMRouter:
if validity_left > LXMessage.TICKET_RENEW:
RNS.log(f"Found generated ticket for {RNS.prettyhexrep(destination_hash)} with {RNS.prettytime(validity_left)} of validity left, re-using this one", RNS.LOG_DEBUG)
return [expires, ticket]
else:
self.available_tickets["inbound"][destination_hash] = {}
@ -1018,7 +1020,7 @@ class LXMRouter:
else:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING)
removed_entries[transient_id] = filepath
removed_count = 0
for transient_id in removed_entries:
try:
@ -1062,15 +1064,15 @@ class LXMRouter:
if os.path.isfile(filepath):
os.unlink(filepath)
self.propagation_entries.pop(transient_id)
bytes_cleaned += entry[3]
RNS.log("Removed "+RNS.prettyhexrep(transient_id)+" with weight "+str(w[1])+" to clear up "+RNS.prettysize(entry[3])+", now cleaned "+RNS.prettysize(bytes_cleaned)+" out of "+RNS.prettysize(bytes_needed)+" needed", RNS.LOG_EXTREME)
except Exception as e:
RNS.log("Error while cleaning LXMF message from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
finally:
i += 1
@ -1120,7 +1122,7 @@ class LXMRouter:
except Exception as e:
RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def clean_outbound_stamp_costs(self):
try:
@ -1132,7 +1134,7 @@ class LXMRouter:
for destination_hash in expired:
self.outbound_stamp_costs.pop(destination_hash)
except Exception as e:
RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
@ -1173,7 +1175,7 @@ class LXMRouter:
for inbound_ticket in expired_inbound:
self.available_tickets["inbound"][destination_hash].pop(inbound_ticket)
except Exception as e:
RNS.log(f"Error while cleaning available tickets. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
@ -1210,7 +1212,7 @@ class LXMRouter:
if not "last_deliveries" in self.available_tickets:
RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["last_deliveries"] = {}
except Exception as e:
RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR)
@ -1290,7 +1292,7 @@ class LXMRouter:
### Message Download ##################################
#######################################################
def request_messages_path_job(self):
job_thread = threading.Thread(target=self.__request_messages_path_job)
job_thread.setDaemon(True)
@ -1306,21 +1308,21 @@ class LXMRouter:
else:
RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG)
self.acknowledge_sync_completion(failure_state=LXMRouter.PR_NO_PATH)
def identity_allowed(self, identity):
if self.auth_required:
if identity.hash in self.allowed_list:
return True
else:
return False
else:
return True
def message_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
elif not self.identity_allowed(remote_identity):
return LXMPeer.ERROR_NO_ACCESS
@ -1357,7 +1359,7 @@ class LXMRouter:
self.propagation_entries.pop(transient_id)
os.unlink(filepath)
RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Error while processing message purge request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -1400,7 +1402,7 @@ class LXMRouter:
self.client_propagation_messages_served += len(response_messages)
return response_messages
except Exception as e:
RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None
@ -1512,7 +1514,7 @@ class LXMRouter:
return True
else:
return False
def cancel_outbound(self, message_id):
try:
if message_id in self.pending_deferred_stamps:
@ -1600,7 +1602,7 @@ class LXMRouter:
for lxm_id in self.pending_deferred_stamps:
if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
return self.pending_deferred_stamps[lxm_id].progress
return None
def get_outbound_lxm_stamp_cost(self, lxm_hash):
@ -1611,7 +1613,7 @@ class LXMRouter:
for lxm_id in self.pending_deferred_stamps:
if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
return self.pending_deferred_stamps[lxm_id].stamp_cost
return None
@ -1788,7 +1790,7 @@ class LXMRouter:
peer.last_heard = time.time()
peer.propagation_transfer_limit = propagation_transfer_limit
RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE)
else:
if len(self.peers) < self.max_peers:
peer = LXMPeer(self, destination_hash)
@ -1826,7 +1828,7 @@ class LXMRouter:
if len(untested_peers) >= rotation_headroom:
RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG)
return
fully_synced_peers = {}
for peer_id in peers:
peer = peers[peer_id]
@ -1916,7 +1918,7 @@ class LXMRouter:
reverse=True
)[0:min(LXMRouter.FASTEST_N_RANDOM_POOL, len(waiting_peers))]
peer_pool.extend(fastest_peers)
unknown_speed_peers = [p for p in waiting_peers if p.sync_transfer_rate == 0]
if len(unknown_speed_peers) > 0:
peer_pool.extend(
@ -1928,11 +1930,11 @@ class LXMRouter:
)
RNS.log("Selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG)
elif len(unresponsive_peers) > 0:
RNS.log("No active peers available, randomly selecting peer to sync from "+str(len(unresponsive_peers))+" unresponsive peers.", RNS.LOG_DEBUG)
peer_pool = unresponsive_peers
if len(peer_pool) > 0:
selected_index = random.randint(0,len(peer_pool)-1)
selected_peer = peer_pool[selected_index]
@ -2084,7 +2086,7 @@ class LXMRouter:
else:
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
RNS.trace_exception(e)
@ -2175,7 +2177,7 @@ class LXMRouter:
else:
lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==")
transient_id = RNS.Identity.full_hash(lxmf_data)
router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True)
if router_propagation_result != False:
RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG)
@ -2225,7 +2227,7 @@ class LXMRouter:
self.pending_deferred_stamps.pop(selected_message_id)
if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
selected_lxm.failed_callback(lxmessage)
return
RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG)
@ -2338,7 +2340,7 @@ class LXMRouter:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
delivery_destination_hash = lxmessage.get_destination().hash
direct_link = None
if delivery_destination_hash in self.direct_links:
# An established direct link already exists to
# the destination, so we'll try to use it for

View file

@ -78,7 +78,7 @@ def apply_config():
active_configuration["peer_announce_interval"] = lxmd_config["lxmf"].as_int("announce_interval")*60
else:
active_configuration["peer_announce_interval"] = None
if "lxmf" in lxmd_config and "delivery_transfer_max_accepted_size" in lxmd_config["lxmf"]:
active_configuration["delivery_transfer_max_accepted_size"] = lxmd_config["lxmf"].as_float("delivery_transfer_max_accepted_size")
if active_configuration["delivery_transfer_max_accepted_size"] < 0.38:
@ -102,6 +102,27 @@ def apply_config():
else:
active_configuration["auth_required"] = False
if "propagation" in lxmd_config \
and "remote_management_enabled" in lxmd_config["propagation"] \
and "remote_management_identities" in lxmd_config["propagation"] \
and lxmd_config["propagation"].as_bool("remote_management_enabled"):
idents = lxmd_config["propagation"].as_list("remote_management_identities")
allowed_hashes = []
for hexhash in idents:
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2
if len(hexhash) != dest_len:
raise ValueError(f"Identity hash length for remote management ACL {hexhash} is invalid, must be {dest_len} hexadecimal characters ({dest_len//2} bytes).")
try:
ident_hash = bytes.fromhex(hexhash)
except Exception as e:
raise ValueError(f"Invalid identity hash for remote management ACL: {hexhash}")
allowed_hashes.append(ident_hash)
active_configuration["remote_management_identities"] = allowed_hashes
else:
active_configuration["remote_management_identities"] = []
if "propagation" in lxmd_config and "announce_at_start" in lxmd_config["propagation"]:
active_configuration["node_announce_at_start"] = lxmd_config["propagation"].as_bool("announce_at_start")
else:
@ -128,14 +149,14 @@ def apply_config():
active_configuration["message_storage_limit"] = 0.005
else:
active_configuration["message_storage_limit"] = 500
if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]:
active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size")
if active_configuration["propagation_transfer_max_accepted_size"] < 0.38:
active_configuration["propagation_transfer_max_accepted_size"] = 0.38
else:
active_configuration["propagation_transfer_max_accepted_size"] = 256
if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]:
active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations")
else:
@ -278,7 +299,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
RNS.log("Could not parse the configuration at "+configpath, RNS.LOG_ERROR)
RNS.log("Check your configuration file for errors!", RNS.LOG_ERROR)
RNS.panic()
apply_config()
RNS.log("Configuration loaded from "+configpath, RNS.LOG_VERBOSE)
@ -287,7 +308,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
if verbosity != 0 or quietness != 0:
targetloglevel = targetloglevel+verbosity-quietness
# Start Reticulum
RNS.log("Substantiating Reticulum...")
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
@ -315,7 +336,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
RNS.log("Could not create and save a new Primary Identity", RNS.LOG_ERROR)
RNS.log("The contained exception was: %s" % (str(e)), RNS.LOG_ERROR)
exit(2)
# Start LXMF
message_router = LXMF.LXMRouter(
identity = identity,
@ -326,8 +347,9 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
delivery_limit = active_configuration["delivery_transfer_max_accepted_size"],
max_peers = active_configuration["max_peers"],
static_peers = active_configuration["static_peers"],
from_static_only = active_configuration["from_static_only"])
from_static_only = active_configuration["from_static_only"],
management_identities = active_configuration["remote_management_identities"])
message_router.register_delivery_callback(lxmf_delivery)
for destination_hash in active_configuration["ignored_lxmf_destinations"]:
@ -348,7 +370,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
if len(active_configuration["allowed_identities"]) == 0:
RNS.log("Clint authentication was enabled, but no identity hashes could be loaded from "+str(allowedpath)+". Nobody will be able to sync messages from this propagation node.", RNS.LOG_WARNING)
for identity_hash in active_configuration["allowed_identities"]:
message_router.allow(identity_hash)
@ -368,7 +390,8 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
message_router.enable_propagation()
RNS.log("LXMF Propagation Node started on "+RNS.prettyhexrep(message_router.propagation_destination.hash))
if len(message_router.management_identities) > 1:
RNS.log(f"Propagation Node remote management is enabled for {len(message_router.management_identities)-1} identities")
RNS.log("Started lxmd version {version}".format(version=__version__), RNS.LOG_NOTICE)
threading.Thread(target=deferred_start_jobs, daemon=True).start()
@ -379,7 +402,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo
def jobs():
global active_configuration, last_peer_announce, last_node_announce
global message_router, lxmf_destination
while True:
try:
if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None:
@ -491,13 +514,13 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness =
identity = RNS.Identity.from_file(identity_path)
if identity == None:
RNS.log("Could not load the Primary Identity from "+identity_path, RNS.LOG_ERROR)
exit(4)
exit(4)
if targetloglevel == None:
targetloglevel = 3
if verbosity != 0 or quietness != 0:
targetloglevel = targetloglevel+verbosity-quietness
reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest)
response = query_status(identity, timeout=timeout, exit_on_fail=True)
@ -616,7 +639,7 @@ def main():
parser.add_argument("--identity", action="store", default=None, help="path to identity used for query request", type=str)
parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit")
parser.add_argument("--version", action="version", version="lxmd {version}".format(version=__version__))
args = parser.parse_args()
if args.exampleconfig:
@ -738,6 +761,18 @@ propagation_transfer_max_accepted_size = 256
auth_required = no
# It is possible to allow remote management of lxmf
# propagation nodes using various utilities, such as
# lxmd --status. You will need to specify a comma
# separated list of one or more Reticulum Identity
# hashes for authenticating the queries from client
# programs. For this purpose, you can use existing
# identity files, or generate new ones with the rnid utility.
# The node's own identity is always allowed regardless
# of these configuration parameters.
# remote_management_enabled = no
# remote_management_identities = 41d20c727598a3fbbdf9106133a3a0ed, d924b81822ca24e68e2effea99bcb8cf
[lxmf]