diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 56ad297..2ff5d75 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -9,6 +9,7 @@ import RNS import RNS.vendor.umsgpack as msgpack from .LXMF import APP_NAME +from .LXMF import FIELD_TICKET from .LXMPeer import LXMPeer from .LXMessage import LXMessage @@ -105,8 +106,10 @@ class LXMRouter: self.locally_delivered_transient_ids = {} self.locally_processed_transient_ids = {} self.outbound_stamp_costs = {} + self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}} self.cost_file_lock = threading.Lock() + self.ticket_file_lock = threading.Lock() if identity == None: identity = RNS.Identity() @@ -164,6 +167,31 @@ class LXMRouter: except Exception as e: RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + try: + if os.path.isfile(self.storagepath+"/available_tickets"): + with self.ticket_file_lock: + with open(self.storagepath+"/available_tickets", "rb") as available_tickets_file: + data = available_tickets_file.read() + self.available_tickets = msgpack.unpackb(data) + if not type(self.available_tickets) == dict: + RNS.log("Invalid data format for loaded available tickets, recreating...", RNS.LOG_ERROR) + self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}} + if not "outbound" in self.available_tickets: + RNS.log("Missing outbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR) + self.available_tickets["outbound"] = {} + if not "inbound" in self.available_tickets: + RNS.log("Missing inbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR) + self.available_tickets["inbound"] = {} + if not "last_deliveries" in self.available_tickets: + RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR) + self.available_tickets["last_deliveries"] = {} + + self.clean_available_tickets() + self.save_available_tickets() + + except Exception as e: + RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + atexit.register(self.exit_handler) job_thread = threading.Thread(target=self.jobloop) @@ -191,6 +219,10 @@ class LXMRouter: da_thread.start() def register_delivery_identity(self, identity, display_name = None, stamp_cost = None): + if len(self.delivery_destinations) != 0: + RNS.log("Currently only one delivery identity is supported per LXMF router instance", RNS.LOG_ERROR) + return None + if not os.path.isdir(self.ratchetpath): os.makedirs(self.ratchetpath) @@ -647,6 +679,61 @@ class LXMRouter: return weight + def generate_ticket(self, destination_hash, expiry=LXMessage.TICKET_EXPIRY): + now = time.time() + ticket = None + if destination_hash in self.available_tickets["last_deliveries"]: + last_delivery = self.available_tickets["last_deliveries"][destination_hash] + elapsed = now - last_delivery + if elapsed < LXMessage.TICKET_INTERVAL: + RNS.log(f"A ticket for {RNS.prettyhexrep(destination_hash)} was already delivered {RNS.prettytime(elapsed)} ago, not including another ticket yet", RNS.LOG_DEBUG) + return None + + if destination_hash in self.available_tickets["inbound"]: + for ticket in self.available_tickets["inbound"][destination_hash]: + ticket_entry = self.available_tickets["inbound"][destination_hash][ticket] + expires = ticket_entry[0]; validity_left = expires - now + if validity_left > LXMessage.TICKET_RENEW: + RNS.log(f"Found generated ticket for {RNS.prettyhexrep(destination_hash)} with {RNS.prettytime(validity_left)} of validity left, re-using this one", RNS.LOG_DEBUG) + return [expires, ticket] + + else: + self.available_tickets["inbound"][destination_hash] = {} + + RNS.log(f"No generated tickets for {RNS.prettyhexrep(destination_hash)} with enough validity found, generating a new one", RNS.LOG_DEBUG) + expires = now+expiry + ticket = os.urandom(LXMessage.TICKET_LENGTH) + self.available_tickets["inbound"][destination_hash][ticket] = [expires] + self.save_available_tickets() + + return [expires, ticket] + + def remember_ticket(self, destination_hash, ticket_entry): + expires = ticket_entry[0]-time.time() + RNS.log(f"Remembering ticket for {RNS.prettyhexrep(destination_hash)}, expires in {RNS.prettytime(expires)}", RNS.LOG_DEBUG) + self.available_tickets["outbound"][destination_hash] = [ticket_entry[0], ticket_entry[1]] + + def get_outbound_ticket(self, destination_hash): + if destination_hash in self.available_tickets["outbound"]: + entry = self.available_tickets["outbound"][destination_hash] + if entry[0] > time.time(): + return entry[1] + + return None + + def get_inbound_tickets(self, destination_hash): + now = time.time() + available_tickets = [] + if destination_hash in self.available_tickets["inbound"]: + for inbound_ticket in self.available_tickets["inbound"][destination_hash]: + if now < self.available_tickets["inbound"][destination_hash][inbound_ticket][0]: + available_tickets.append(inbound_ticket) + + if len(available_tickets) == 0: + return None + else: + return available_tickets + def get_size(self, transient_id): lxm_size = self.propagation_entries[transient_id][3] return lxm_size @@ -778,13 +865,57 @@ class LXMRouter: if not os.path.isdir(self.storagepath): os.makedirs(self.storagepath) - locally_processed_file = open(self.storagepath+"/outbound_stamp_costs", "wb") - locally_processed_file.write(msgpack.packb(self.outbound_stamp_costs)) - locally_processed_file.close() + outbound_stamp_costs_file = open(self.storagepath+"/outbound_stamp_costs", "wb") + outbound_stamp_costs_file.write(msgpack.packb(self.outbound_stamp_costs)) + outbound_stamp_costs_file.close() except Exception as e: RNS.log("Could not save locally processed message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + def clean_available_tickets(self): + try: + # Clean outbound tickets + expired_outbound = [] + for destination_hash in self.available_tickets["outbound"]: + entry = self.available_tickets["outbound"][destination_hash] + if time.time() > entry[0]: + expired_outbound.append(destination_hash) + + for destination_hash in expired_outbound: + RNS.log(f"Cleaning expired outbound ticket for {destination_hash}") # TODO: Remove + self.available_tickets["outbound"].pop(destination_hash) + + # Clean inbound tickets + for destination_hash in self.available_tickets["inbound"]: + expired_inbound = [] + for inbound_ticket in self.available_tickets["inbound"][destination_hash]: + entry = self.available_tickets["inbound"][destination_hash][inbound_ticket] + ticket_expiry = entry[0] + if time.time() > ticket_expiry+LXMessage.TICKET_GRACE: + expired_inbound.append(inbound_ticket) + + for inbound_ticket in expired_inbound: + RNS.log(f"Cleaning expired inbound ticket for {destination_hash}") # TODO: Remove + self.available_tickets["inbound"][destination_hash].pop(destination_hash) + + except Exception as e: + RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + + def save_available_tickets(self): + with self.ticket_file_lock: + try: + RNS.log("Saving available tickets...", RNS.LOG_DEBUG) # TODO: Remove + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) + + available_tickets_file = open(self.storagepath+"/available_tickets", "wb") + available_tickets_file.write(msgpack.packb(self.available_tickets)) + available_tickets_file.close() + + except Exception as e: + RNS.log("Could not save available tickets to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + def exit_handler(self): if self.propagation_node: try: @@ -1026,14 +1157,29 @@ class LXMRouter: return False def handle_outbound(self, lxmessage): + destination_hash = lxmessage.get_destination().hash if lxmessage.stamp_cost == None: - destination_hash = lxmessage.get_destination().hash if destination_hash in self.outbound_stamp_costs: stamp_cost = self.outbound_stamp_costs[destination_hash][1] lxmessage.stamp_cost = stamp_cost RNS.log(f"No stamp cost set on LXM to {RNS.prettyhexrep(destination_hash)}, autoconfigured to {stamp_cost}, as required by latest announce", RNS.LOG_DEBUG) lxmessage.state = LXMessage.OUTBOUND + + # If an outbound ticket is available for this + # destination, attach it to the message. + lxmessage.outbound_ticket = self.get_outbound_ticket(destination_hash) + if lxmessage.outbound_ticket != None and lxmessage.defer_stamp: + RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but outbound ticket was applied, processing immediately", RNS.LOG_DEBUG) + lxmessage.defer_stamp = False + + # If requested, include a ticket to allow the + # destination to reply without generating a stamp. + if lxmessage.include_ticket: + ticket = self.generate_ticket(lxmessage.destination_hash) + if ticket: + lxmessage.fields[FIELD_TICKET] = ticket + if not lxmessage.packed: lxmessage.pack() @@ -1073,9 +1219,23 @@ class LXMRouter: try: message = LXMessage.unpack_from_bytes(lxmf_data) + if message.signature_validated and FIELD_TICKET in message.fields: + ticket_entry = message.fields[FIELD_TICKET] + if type(ticket_entry) == list and len(ticket_entry) > 1: + expires = ticket_entry[0] + ticket = ticket_entry[1] + + if time.time() < expires: + if type(ticket) == bytes and len(ticket) == LXMessage.TICKET_LENGTH: + self.remember_ticket(message.source_hash, ticket_entry) + def save_job(): + self.save_available_tickets() + threading.Thread(target=save_job, daemon=True).start() + required_stamp_cost = self.delivery_destinations[message.destination_hash].stamp_cost if required_stamp_cost != None: - if message.validate_stamp(required_stamp_cost): + destination_tickets = self.get_inbound_tickets(message.source_hash) + if message.validate_stamp(required_stamp_cost, tickets=destination_tickets): message.stamp_valid = True else: message.stamp_valid = False @@ -1471,6 +1631,11 @@ class LXMRouter: if lxmessage.state == LXMessage.DELIVERED: RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) self.pending_outbound.remove(lxmessage) + if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields: + RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG) + self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time() + self.save_available_tickets() + elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT: RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) self.pending_outbound.remove(lxmessage) diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 3b4956d..14ff299 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -35,6 +35,17 @@ class LXMessage: DESTINATION_LENGTH = RNS.Identity.TRUNCATED_HASHLENGTH//8 SIGNATURE_LENGTH = RNS.Identity.SIGLENGTH//8 + TICKET_LENGTH = RNS.Identity.TRUNCATED_HASHLENGTH//8 + + # Default ticket expiry is 3 weeks, with an + # additional grace period of 5 days, allowing + # for timekeeping inaccuracies. Tickets will + # automatically renew when there is less than + # 14 days to expiry. + TICKET_EXPIRY = 21*24*60*60 + TICKET_GRACE = 5*24*60*60 + TICKET_RENEW = 14*24*60*60 + TICKET_INTERVAL = 3*24*60*60 # LXMF overhead is 111 bytes per message: # 16 bytes for destination hash @@ -93,8 +104,7 @@ class LXMessage: else: return "" - def __init__(self, destination, source, content = "", title = "", fields = None, desired_method = None, - destination_hash = None, source_hash = None, stamp_cost=None): + def __init__(self, destination, source, content = "", title = "", fields = None, desired_method = None, destination_hash = None, source_hash = None, stamp_cost=None, include_ticket=False): if isinstance(destination, RNS.Destination) or destination == None: self.__destination = destination @@ -114,25 +124,31 @@ class LXMessage: else: raise ValueError("LXMessage initialised with invalid source") + if title == None: + title = "" + self.set_title_from_string(title) self.set_content_from_string(content) self.set_fields(fields) - self.payload = None - self.timestamp = None - self.signature = None - self.hash = None - self.packed = None - self.stamp = None - self.stamp_cost = stamp_cost - self.stamp_valid = False - self.defer_stamp = False - self.state = LXMessage.GENERATING - self.method = LXMessage.UNKNOWN - self.progress = 0.0 - self.rssi = None - self.snr = None - self.q = None + self.payload = None + self.timestamp = None + self.signature = None + self.hash = None + self.packed = None + self.state = LXMessage.GENERATING + self.method = LXMessage.UNKNOWN + self.progress = 0.0 + self.rssi = None + self.snr = None + self.q = None + + self.stamp = None + self.stamp_cost = stamp_cost + self.stamp_valid = False + self.defer_stamp = False + self.outbound_ticket = None + self.include_ticket = include_ticket self.propagation_packed = None self.paper_packed = None @@ -254,7 +270,13 @@ class LXMessage: else: return True - def validate_stamp(self, target_cost): + def validate_stamp(self, target_cost, tickets=None): + if tickets != None: + for ticket in tickets: + if self.stamp == RNS.Identity.truncated_hash(ticket+self.message_id): + RNS.log(f"Stamp on {self} validated by inbound ticket", RNS.LOG_DEBUG) # TODO: Remove at some point + return True + if self.stamp == None: return False else: @@ -264,14 +286,25 @@ class LXMessage: return False def get_stamp(self, timeout=None): - if self.stamp_cost == None: + # If an outbound ticket exists, use this for + # generating a valid stamp. + if self.outbound_ticket != None and type(self.outbound_ticket) == bytes and len(self.outbound_ticket) == LXMessage.TICKET_LENGTH: + RNS.log(f"Generating stamp with outbound ticket for {self}", RNS.LOG_DEBUG) # TODO: Remove at some point + return RNS.Identity.truncated_hash(self.outbound_ticket+self.message_id) + + # If no stamp cost is required, we can just + # return immediately. + elif self.stamp_cost == None: return None + # If a stamp was already generated, return + # it immediately. elif self.stamp != None: - # TODO: Check that message hash cannot actually - # change under any circumstances before handoff return self.stamp + # Otherwise, we will need to generate a + # valid stamp according to the cost that + # the receiver has specified. else: RNS.log(f"Generating stamp with cost {self.stamp_cost} for {self}...", RNS.LOG_DEBUG) workblock = LXMessage.stamp_workblock(self.message_id) @@ -279,6 +312,7 @@ class LXMessage: total_rounds = 0 if not RNS.vendor.platformutils.is_android(): + RNS.log("Preparing IPC semaphores", RNS.LOG_DEBUG) # TODO: Remove stop_event = multiprocessing.Event() result_queue = multiprocessing.Queue(maxsize=1) rounds_queue = multiprocessing.Queue() @@ -305,14 +339,17 @@ class LXMessage: job_procs = [] jobs = multiprocessing.cpu_count() + RNS.log("Starting workers", RNS.LOG_DEBUG) # TODO: Remove for _ in range(jobs): process = multiprocessing.Process(target=job, kwargs={"stop_event": stop_event},) job_procs.append(process) process.start() + RNS.log("Awaiting results on queue", RNS.LOG_DEBUG) # TODO: Remove stamp = result_queue.get() stop_event.set() + RNS.log("Joining worker processes", RNS.LOG_DEBUG) # TODO: Remove for j in range(jobs): process = job_procs[j] process.join() @@ -354,21 +391,21 @@ class LXMessage: wm = multiprocessing.Manager() jobs = multiprocessing.cpu_count() - RNS.log(f"Dispatching {jobs} workers for stamp generation...") # TODO: Remove + # RNS.log(f"Dispatching {jobs} workers for stamp generation...") # TODO: Remove results_dict = wm.dict() while stamp == None: job_procs = [] def job(procnum=None, results_dict=None, wb=None): - RNS.log(f"Worker {procnum} starting...") # TODO: Remove + # RNS.log(f"Worker {procnum} starting...") # TODO: Remove rounds = 0 stamp = os.urandom(256//8) while not sv(stamp, self.stamp_cost, wb): if rounds >= 500: stamp = None - RNS.log(f"Worker {procnum} found no result in {rounds} rounds") # TODO: Remove + # RNS.log(f"Worker {procnum} found no result in {rounds} rounds") # TODO: Remove break stamp = os.urandom(256//8) @@ -386,17 +423,24 @@ class LXMessage: for j in results_dict: r = results_dict[j] - RNS.log(f"Result from {r}: {r[1]} rounds, stamp: {r[0]}") # TODO: Remove + # RNS.log(f"Result from {r}: {r[1]} rounds, stamp: {r[0]}") # TODO: Remove total_rounds += r[1] if r[0] != None: stamp = r[0] - RNS.log(f"Found stamp: {stamp}") # TODO: Remove + # RNS.log(f"Found stamp: {stamp}") # TODO: Remove + + if stamp == None: + elapsed = time.time() - start_time + speed = total_rounds/elapsed + RNS.log(f"Stamp generation for {self} running. {total_rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG) duration = time.time() - start_time rounds = total_rounds + + speed = total_rounds/duration # TODO: Remove stats output - RNS.log(f"Stamp generated in {RNS.prettytime(duration)} / {rounds} rounds", RNS.LOG_DEBUG) + RNS.log(f"Stamp generated in {RNS.prettytime(duration)}, {rounds} rounds, {int(speed)} rounds per second", RNS.LOG_DEBUG) # RNS.log(f"Rounds per second {int(rounds/duration)}", RNS.LOG_DEBUG) # RNS.log(f"Stamp: {RNS.hexrep(stamp)}", RNS.LOG_DEBUG) # RNS.log(f"Resulting hash: {RNS.hexrep(RNS.Identity.full_hash(workblock+stamp))}", RNS.LOG_DEBUG) diff --git a/docs/example_receiver.py b/docs/example_receiver.py index 64f914a..7d3aa20 100644 --- a/docs/example_receiver.py +++ b/docs/example_receiver.py @@ -6,6 +6,7 @@ required_stamp_cost = 8 enforce_stamps = False def delivery_callback(message): + global my_lxmf_destination, router time_string = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(message.timestamp)) signature_string = "Signature is invalid, reason undetermined" if message.signature_validated: @@ -35,6 +36,12 @@ def delivery_callback(message): RNS.log("\t| Stamp : "+stamp_string) RNS.log("\t+---------------------------------------------------------------") + # Optionally, send a reply + # source = my_lxmf_destination + # dest = message.source + # lxm = LXMF.LXMessage(dest, source, "Reply", None, desired_method=LXMF.LXMessage.DIRECT, include_ticket=True) + # router.handle_outbound(lxm) + r = RNS.Reticulum() router = LXMF.LXMRouter(storagepath="./tmp1", enforce_stamps=enforce_stamps) diff --git a/docs/example_sender.py b/docs/example_sender.py index 3dc6494..bcb8d36 100644 --- a/docs/example_sender.py +++ b/docs/example_sender.py @@ -14,7 +14,7 @@ r = RNS.Reticulum() router = LXMF.LXMRouter(storagepath="./tmp2") router.register_delivery_callback(delivery_callback) ident = RNS.Identity() -source = router.register_delivery_identity(ident, display_name=random_names[random.randint(0,len(random_names)-1)]) +source = router.register_delivery_identity(ident, display_name=random_names[random.randint(0,len(random_names)-1)], stamp_cost=8) router.announce(source.hash) RNS.log("Source announced") @@ -42,14 +42,14 @@ while True: lxm = LXMF.LXMessage(dest, source, random_msgs[random.randint(0,len(random_msgs)-1)], random_titles[random.randint(0,len(random_titles)-1)], - desired_method=LXMF.LXMessage.DIRECT) + desired_method=LXMF.LXMessage.DIRECT, include_ticket=True) # Or, create an oppertunistic, single-packet message # for sending without first establishing a link: # lxm = LXMF.LXMessage(dest, source, "This is a test", # random_titles[random.randint(0,len(random_titles)-1)], - # desired_method=LXMF.LXMessage.OPPORTUNISTIC) + # desired_method=LXMF.LXMessage.OPPORTUNISTIC, include_ticket=True) # Or, try sending the message via a propagation node: