From 813467243603517c2938763eaad937a05a07d8bc Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 9 Sep 2024 02:12:27 +0200 Subject: [PATCH] Cleanup --- LXMF/LXMRouter.py | 1 - LXMF/LXMessage.py | 205 ++++++++++++++++++++++----------------- docs/example_receiver.py | 2 + 3 files changed, 118 insertions(+), 90 deletions(-) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 260911c..199ff63 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -1696,7 +1696,6 @@ class LXMRouter: if len(self.pending_deferred_stamps) > 0: if self.stamp_gen_lock.locked(): - RNS.log(f"A stamp is already generating, returning...", RNS.LOG_DEBUG) # TODO: Remove return else: diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 3564260..b70c66f 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -335,7 +335,121 @@ class LXMessage: start_time = time.time() total_rounds = 0 - if not RNS.vendor.platformutils.is_android(): + if RNS.vendor.platformutils.is_windows() or RNS.vendor.platformutils.is_darwin(): + platform = RNS.vendor.platformutils.get_platform() + RNS.log(f"Running stamp generation on {platform}, work limited to single CPU core. This will be slow.", RNS.LOG_WARNING) + rounds = 0 + pstamp = os.urandom(256//8) + + def sv(s, c, w): + target = 0b1<<256-c; m = w+s + result = RNS.Identity.full_hash(m) + if int.from_bytes(result, byteorder="big") > target: + return False + else: + return True + + while not sv(pstamp, self.stamp_cost, workblock): + pstamp = os.urandom(256//8); rounds += 1 + + stamp = pstamp + duration = time.time() - start_time + + elif RNS.vendor.platformutils.is_android(): + # Semaphore support is flaky to non-existent on + # Android, so we need to manually dispatch and + # manage workloads here, while periodically + # checking in on the progress. + + use_nacl = False + rounds_per_worker = 1000 + try: + import nacl.encoding + import nacl.hash + use_nacl = True + except: + pass + + if use_nacl: + def full_hash(m): + return nacl.hash.sha256(m, encoder=nacl.encoding.RawEncoder) + else: + def full_hash(m): + return RNS.Identity.full_hash(m) + + def sv(s, c, w): + target = 0b1<<256-c + m = w+s + result = full_hash(m) + if int.from_bytes(result, byteorder="big") > target: + return False + else: + return True + + stamp = None + wm = multiprocessing.Manager() + jobs = multiprocessing.cpu_count() + + RNS.log(f"Dispatching {jobs} workers for stamp generation...", RNS.LOG_DEBUG) # TODO: Remove + + results_dict = wm.dict() + while stamp == None: + job_procs = [] + + def job(procnum=None, results_dict=None, wb=None, sc=None, jr=None): + # RNS.log(f"Worker {procnum} starting for {jr} rounds...") # TODO: Remove + try: + rounds = 0 + found_stamp = None + found_time = None + + while True: + pstamp = os.urandom(256//8) + rounds += 1 + if sv(pstamp, sc, wb): + found_stamp = pstamp + found_time = time.time() + break + + if rounds >= jr: + # RNS.log(f"Worker {procnum} found no result in {rounds} rounds") # TODO: Remove + break + + results_dict[procnum] = [found_stamp, rounds, found_time] + except Exception as e: + RNS.log("WORKER ERROR") + RNS.trace_exception(e) + + try: + for pnum in range(jobs): + pargs = {"procnum":pnum, "results_dict": results_dict, "wb": workblock, "sc":self.stamp_cost, "jr":rounds_per_worker} + process = multiprocessing.Process(target=job, kwargs=pargs) + job_procs.append(process) + process.start() + + for process in job_procs: + process.join() + + for j in results_dict: + r = results_dict[j] + total_rounds += r[1] + if r[0] != None: + stamp = r[0] + found_time = r[2] + + if stamp == None: + elapsed = time.time() - start_time + speed = total_rounds/elapsed + RNS.log(f"Stamp generation for {self} running. {total_rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG) + + except Exception as e: + RNS.log("ERROR") + RNS.trace_exception(e) + + duration = time.time() - start_time + rounds = total_rounds + + else: allow_kill = True stamp = None jobs = multiprocessing.cpu_count() @@ -417,94 +531,7 @@ class LXMessage: rounds = total_rounds - else: - # Semaphore support is flaky to non-existent on - # Android, so we need to manually dispatch and - # manage workloads here, while periodically - # checking in on the progress. - - use_nacl = False - rounds_per_worker = 1000 - if RNS.vendor.platformutils.is_android(): - rounds_per_worker = 500 - try: - import nacl.encoding - import nacl.hash - use_nacl = True - except: - pass - - if use_nacl: - def full_hash(m): - return nacl.hash.sha256(m, encoder=nacl.encoding.RawEncoder) - else: - def full_hash(m): - return RNS.Identity.full_hash(m) - - def sv(s, c, w): - target = 0b1<<256-c - m = w+s - result = full_hash(m) - if int.from_bytes(result, byteorder="big") > target: - return False - else: - return True - - stamp = None - wm = multiprocessing.Manager() - jobs = multiprocessing.cpu_count() - - RNS.log(f"Dispatching {jobs} workers for stamp generation...", RNS.LOG_DEBUG) # TODO: Remove - - results_dict = wm.dict() - while stamp == None: - job_procs = [] - - def job(procnum=None, results_dict=None, wb=None, sc=None, jr=None): - RNS.log(f"Worker {procnum} starting for {jr} rounds...") # TODO: Remove - rounds = 0 - found_stamp = None - found_time = None - - while True: - pstamp = os.urandom(256//8) - rounds += 1 - if sv(pstamp, sc, wb): - found_stamp = pstamp - found_time = time.time() - break - - if rounds >= jr: - # RNS.log(f"Worker {procnum} found no result in {rounds} rounds") # TODO: Remove - break - - results_dict[procnum] = [found_stamp, rounds, found_time] - - for pnum in range(jobs): - pargs = {"procnum":pnum, "results_dict": results_dict, "wb": workblock, "sc":self.stamp_cost, "jr":rounds_per_worker} - process = multiprocessing.Process(target=job, kwargs=pargs) - job_procs.append(process) - process.start() - - for process in job_procs: - process.join() - - for j in results_dict: - r = results_dict[j] - total_rounds += r[1] - if r[0] != None: - stamp = r[0] - found_time = r[2] - - if stamp == None: - elapsed = found_time - start_time - speed = total_rounds/elapsed - RNS.log(f"Stamp generation for {self} running. {total_rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG) - - duration = time.time() - start_time - rounds = total_rounds - - speed = total_rounds/duration + speed = rounds/duration RNS.log(f"Stamp generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG) diff --git a/docs/example_receiver.py b/docs/example_receiver.py index 7d3aa20..999f6a3 100644 --- a/docs/example_receiver.py +++ b/docs/example_receiver.py @@ -32,6 +32,8 @@ def delivery_callback(message): RNS.log("\t| Title : "+message.title_as_string()) RNS.log("\t| Content : "+message.content_as_string()) RNS.log("\t| Fields : "+str(message.fields)) + if message.ratchet_id: + RNS.log("\t| Ratchet : "+str(RNS.Identity._get_ratchet_id(message.ratchet_id))) RNS.log("\t| Message signature : "+signature_string) RNS.log("\t| Stamp : "+stamp_string) RNS.log("\t+---------------------------------------------------------------")