diff --git a/RNS/Discovery.py b/RNS/Discovery.py index 363c1b0..587e6ed 100644 --- a/RNS/Discovery.py +++ b/RNS/Discovery.py @@ -375,3 +375,89 @@ class InterfaceDiscovery(): return if self.discovery_callback and callable(self.discovery_callback): self.discovery_callback(info) + +class BlackholeUpdater(): + INITIAL_WAIT = 20 + JOB_INTERVAL = 60 + UPDATE_INTERVAL = 3*60*60 + SOURCE_TIMEOUT = 25 + + def __init__(self): + self.last_updates = {} + self.should_run = False + self.job_interval = self.JOB_INTERVAL + self.update_lock = threading.Lock() + + def start(self): + if not self.should_run: + source_count = len(RNS.Reticulum.blackhole_sources()) + ms = "" if source_count == 1 else "s" + RNS.log(f"Starting blackhole updater with {source_count} source{ms}", RNS.LOG_DEBUG) + self.should_run = True + threading.Thread(target=self.job, daemon=True).start() + + def stop(self): self.should_run = False + + def update_link_established(self, link): + remote_identity = link.get_remote_identity() + RNS.log(f"Link established for blackhole list update from {RNS.prettyhexrep(remote_identity.hash)}", RNS.LOG_DEBUG) + receipt = link.request("/list") + while not receipt.concluded(): time.sleep(0.2) + response = receipt.get_response() + link.teardown() + + if type(response) == dict: blackhole_list = response + else: blackhole_list = None + + if blackhole_list: + added = 0 + for identity_hash in blackhole_list: + entry = blackhole_list[identity_hash] + if not identity_hash in RNS.Transport.blackholed_identities: + RNS.Transport.blackholed_identities[identity_hash] = entry + added += 1 + + if added > 0: + spec = "identity" if added == 1 else "identities" + RNS.log(f"Added {added} blackholed {spec} from {RNS.prettyhexrep(remote_identity.hash)}", RNS.LOG_DEBUG) + + try: + sourcelistpath = os.path.join(RNS.Reticulum.blackholepath, RNS.hexrep(remote_identity.hash, delimit=False)) + tmppath = f"{sourcelistpath}.tmp" + with open(tmppath, "wb") as f: f.write(msgpack.packb(blackhole_list)) + if os.path.isfile(sourcelistpath): os.unlink(sourcelistpath) + os.rename(tmppath, sourcelistpath) + + except Exception as e: + RNS.log(f"Error while persisting blackhole list from {RNS.prettyhexrep(remote_identity.hash)}: {e}", RNS.LOG_ERROR) + + RNS.log(f"Blackhole list update from {RNS.prettyhexrep(remote_identity.hash)} completed", RNS.LOG_DEBUG) + + def job(self): + time.sleep(self.INITIAL_WAIT) + while self.should_run: + try: + now = time.time() + for identity_hash in RNS.Reticulum.blackhole_sources(): + if identity_hash in self.last_updates: last_update = self.last_updates[identity_hash] + else: last_update = 0 + + if now > last_update+self.UPDATE_INTERVAL: + try: + destination_hash = RNS.Destination.hash_from_name_and_identity("rnstransport.info.blackhole", identity_hash) + RNS.log(f"Attempting blackhole list update from {RNS.prettyhexrep(identity_hash)}...", RNS.LOG_DEBUG) + if not RNS.Transport.await_path(destination_hash): RNS.log(f"No path available for blackhole list update from {RNS.prettyhexrep(identity_hash)}, retrying later", RNS.LOG_VERBOSE) + else: + remote_identity = RNS.Identity.recall(destination_hash) + destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "rnstransport", "info", "blackhole") + RNS.Link(destination, established_callback=self.update_link_established) + self.last_updates[identity_hash] = time.time() + + except Exception as e: + RNS.log(f"Error while establishing link for blackhole list update from {RNS.prettyhexrep(identity_hash)}: {e}", RNS.LOG_ERROR) + + except Exception as e: + RNS.log(f"Error in blackhole list updater job: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + + time.sleep(self.job_interval) \ No newline at end of file diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index de73b49..7755f77 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -337,8 +337,9 @@ class Reticulum: thread.start() if self.is_shared_instance or self.is_standalone_instance: - if Reticulum.__discovery_enabled: RNS.Transport.enable_discovery() + if Reticulum.__discovery_enabled: RNS.Transport.enable_discovery() if Reticulum.__discover_interfaces: RNS.Transport.discover_interfaces() + if Reticulum.__blackhole_sources: RNS.Transport.enable_blackhole_updater() atexit.register(Reticulum.exit_handler) signal.signal(signal.SIGINT, Reticulum.sigint_handler) diff --git a/RNS/Transport.py b/RNS/Transport.py index df11ec5..2dcc1d0 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -142,6 +142,7 @@ class Transport: start_time = None jobs_locked = False jobs_running = False + hashlist_maxsize = 1000000 job_interval = 0.250 links_last_checked = 0.0 links_check_interval = 1.0 @@ -152,14 +153,15 @@ class Transport: pending_prs_last_checked = 0.0 pending_prs_check_interval = 30.0 cache_last_cleaned = 0.0 - cache_clean_interval = 300.0 - hashlist_maxsize = 1000000 + cache_clean_interval = 5*60 tables_last_culled = 0.0 tables_cull_interval = 5.0 interface_last_jobs = 0.0 interface_jobs_interval = 5.0 last_mgmt_announce = 0 mgmt_announce_interval = 2*60*60 + blackhole_last_checked = 0 + blackhole_check_interval = 60 inbound_announce_lock = Lock() interface_announcer = None discovery_handler = None @@ -388,6 +390,12 @@ class Transport: if not Transport.discovery_handler: Transport.discovery_handler = RNS.Discovery.InterfaceDiscovery(required_value=RNS.Reticulum.required_discovery_value(), discover_interfaces=True) + @staticmethod + def enable_blackhole_updater(): + if not Transport.blackhole_updater: + Transport.blackhole_updater = RNS.Discovery.BlackholeUpdater() + Transport.blackhole_updater.start() + @staticmethod def count_traffic_loop(): while True: @@ -815,6 +823,32 @@ class Transport: except Exception as e: RNS.log(f"Error while sending management announces: {e}", RNS.LOG_ERROR) + # Check expired blackhole entries + if time.time() > Transport.blackhole_last_checked+Transport.blackhole_check_interval: + try: + Transport.blackhole_last_checked = time.time() + stale_blackhole_entries = [] + for identity_hash in Transport.blackholed_identities.copy(): + try: + until = Transport.blackholed_identities[identity_hash]["until"] + if until and time.time() > until: stale_blackhole_entries.append(identity_hash) + + except Exception as e: + RNS.log(f"Error while checking blackhole expiry for {RNS.prettyhexrep(identity_hash)}: {e}", RNS.LOG_ERROR) + + i = 0 + for identity_hash in stale_blackhole_entries: + if identity_hash in Transport.blackholed_identities: + Transport.blackholed_identities.pop(identity_hash) + i += 1 + + if i > 0: + if i == 1: RNS.log("Removed "+str(i)+" blackholed identity", RNS.LOG_VERBOSE) + else: RNS.log("Removed "+str(i)+" blackholed identities", RNS.LOG_VERBOSE) + + except Exception as e: + RNS.log(f"Error while checking blackholed identities: {e}", RNS.LOG_ERROR) + if should_collect: gc.collect() else: @@ -3137,6 +3171,9 @@ class Transport: else: if len(filename) != dest_len: raise ValueError(f"Identity hash length for blackhole source {filename} is invalid") source_identity_hash = bytes.fromhex(filename) + if not source_identity_hash in RNS.Reticulum.blackhole_sources(): + RNS.log(f"Skipping disabled blackhole source {RNS.prettyhexrep(source_identity_hash)}", RNS.LOG_INFO) + continue sourcepath = os.path.join(RNS.Reticulum.blackholepath, filename) with open(sourcepath, "rb") as f: diff --git a/RNS/Utilities/rnpath.py b/RNS/Utilities/rnpath.py index c82c632..972277e 100644 --- a/RNS/Utilities/rnpath.py +++ b/RNS/Utilities/rnpath.py @@ -185,7 +185,7 @@ def program_setup(configdir, table, rates, drop, destination_hexhash, verbosity, until = blackholed_list[identity_hash]["until"] reason = blackholed_list[identity_hash]["reason"] source = blackholed_list[identity_hash]["source"] - until_str = f"for {RNS.prettytime(until-now)}" if until else "indefinitely" + until_str = f"for {RNS.prettytime(max(0, until-now))}" if until else "indefinitely" reason_str = f" ({trunc(reason)})" if reason else "" by_str = f" by {RNS.prettyhexrep(source)}" if source != RNS.Transport.identity.hash else "" filter_str = f"{RNS.prettyhexrep(identity_hash)} {until_str} {reason_str} {by_str}" @@ -494,7 +494,7 @@ def main(): parser.add_argument("-b", "--blackholed", action="store_true", help="list blackholed identities", default=False) parser.add_argument("-B", "--blackhole", action="store_true", help="blackhole identity", default=False) parser.add_argument("-U", "--unblackhole", action="store_true", help="unblackhole identity", default=False) - parser.add_argument( "--duration", action="store", type=int, help="duration of blackhole enforcement in hours", default=None) + parser.add_argument( "--duration", action="store", type=float, help="duration of blackhole enforcement in hours", default=None) parser.add_argument( "--reason", action="store", type=str, help="reason for blackholing identity", default=None) parser.add_argument("-p", "--blackholed-list", action="store_true", help="view published blackhole list for remote transport instance", default=False) parser.add_argument("-j", "--json", action="store_true", help="output in JSON format", default=False)