diff --git a/LXMF/Handlers.py b/LXMF/Handlers.py index a33d13d..eb10a76 100644 --- a/LXMF/Handlers.py +++ b/LXMF/Handlers.py @@ -25,8 +25,8 @@ class LXMFDeliveryAnnounceHandler: try: stamp_cost = stamp_cost_from_app_data(app_data) - if stamp_cost != None: - self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost) + self.lxmrouter.update_stamp_cost(destination_hash, stamp_cost) + except Exception as e: RNS.log(f"An error occurred while trying to decode announced stamp cost. The contained exception was: {e}", RNS.LOG_ERROR) diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 9a82a13..49793cb 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -1282,8 +1282,10 @@ class LXMRouter: destination_tickets = self.get_inbound_tickets(message.source_hash) if message.validate_stamp(required_stamp_cost, tickets=destination_tickets): message.stamp_valid = True + message.stamp_checked = True else: message.stamp_valid = False + message.stamp_checked = True if not message.stamp_valid: if self._enforce_stamps: diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 86b637d..adbac64 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -46,6 +46,7 @@ class LXMessage: TICKET_GRACE = 5*24*60*60 TICKET_RENEW = 14*24*60*60 TICKET_INTERVAL = 1*24*60*60 + COST_TICKET = 0x100 # LXMF overhead is 111 bytes per message: # 16 bytes for destination hash @@ -145,7 +146,9 @@ class LXMessage: self.stamp = None self.stamp_cost = stamp_cost + self.stamp_value = None self.stamp_valid = False + self.stamp_checked = False self.defer_stamp = True self.outbound_ticket = None self.include_ticket = include_ticket @@ -162,6 +165,7 @@ class LXMessage: self.delivery_attempts = 0 self.transport_encrypted = False self.transport_encryption = None + self.ratchet_id = None self.packet_representation = None self.resource_representation = None self.__delivery_destination = None @@ -272,17 +276,31 @@ class LXMessage: else: return True + @staticmethod + def stamp_value(material): + bits = 256 + value = 0 + i = int.from_bytes(material) + while ((i & (1 << (bits - 1))) == 0): + i = (i << 1) + value += 1 + + return value + def validate_stamp(self, target_cost, tickets=None): if tickets != None: for ticket in tickets: if self.stamp == RNS.Identity.truncated_hash(ticket+self.message_id): RNS.log(f"Stamp on {self} validated by inbound ticket", RNS.LOG_DEBUG) # TODO: Remove at some point + self.stamp_value = LXMessage.COST_TICKET return True if self.stamp == None: return False else: - if LXMessage.stamp_valid(self.stamp, target_cost, LXMessage.stamp_workblock(self.message_id)): + workblock = LXMessage.stamp_workblock(self.message_id) + if LXMessage.stamp_valid(self.stamp, target_cost, workblock): + self.stamp_value = LXMessage.stamp_value(RNS.Identity.full_hash(workblock+self.stamp)) return True else: return False @@ -314,8 +332,8 @@ class LXMessage: total_rounds = 0 if not RNS.vendor.platformutils.is_android(): - mp_debug = True - + allow_kill = True + stamp = None jobs = multiprocessing.cpu_count() stop_event = multiprocessing.Event() result_queue = multiprocessing.Queue(1) @@ -345,7 +363,7 @@ class LXMessage: job_procs = [] RNS.log(f"Starting {jobs} workers", RNS.LOG_DEBUG) # TODO: Remove for jpn in range(jobs): - process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event, "pn": jpn, "sc": self.stamp_cost, "wb": workblock},) + process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event, "pn": jpn, "sc": self.stamp_cost, "wb": workblock}, daemon=True) job_procs.append(process) process.start() @@ -379,7 +397,13 @@ class LXMessage: if not all_exited: RNS.log("Stamp generation IPC timeout, possible worker deadlock", RNS.LOG_ERROR) - return None + if allow_kill: + for j in range(jobs): + process = job_procs[j] + process.kill() + return stamp + else: + return None else: for j in range(jobs):