diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 64677a8..5c71f93 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -1234,6 +1234,30 @@ class LXMRouter: else: return False + def cancel_outbound(self, message_id): + try: + lxmessage = None + for lxm in self.pending_outbound: + if lxm.message_id == message_id: + lxmessage = lxm + + if message_id in self.pending_deferred_stamps: + RNS.log(f"Cancelling deferred stamp generation for {lxmessage}", RNS.LOG_DEBUG) + + if lxmessage != None: + lxmessage.state = LXMessage.CANCELLED + if lxmessage in self.pending_outbound: + RNS.log(f"Cancelling {lxmessage} in outbound queue", RNS.LOG_DEBUG) + if lxmessage.representation == LXMessage.RESOURCE: + if lxmessage.resource_representation != None: + lxmessage.resource_representation.cancel() + + self.process_outbound() + + except Exception as e: + RNS.log(f"An error occurred while cancelling {lxmessage}: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + def handle_outbound(self, lxmessage): destination_hash = lxmessage.get_destination().hash @@ -1780,10 +1804,15 @@ class LXMRouter: self.pending_outbound.append(selected_lxm) RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG) else: - RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) - selected_lxm.stamp_generation_failed = True - self.pending_deferred_stamps.pop(selected_message_id) - self.fail_message(selected_lxm) + if selected_lxm.state == LXMessage.CANCELLED: + RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_ERROR) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + else: + RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR) + selected_lxm.stamp_generation_failed = True + self.pending_deferred_stamps.pop(selected_message_id) + self.fail_message(selected_lxm) def process_outbound(self, sender = None): @@ -1820,8 +1849,14 @@ class LXMRouter: RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) self.pending_outbound.remove(lxmessage) + elif lxmessage.state == LXMessage.CANCELLED: + RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) + self.pending_outbound.remove(lxmessage) + if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): + lxmessage.failed_callback(lxmessage) + else: - RNS.log("Starting outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01 diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 253085b..316e798 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -16,8 +16,9 @@ class LXMessage: SENDING = 0x02 SENT = 0x04 DELIVERED = 0x08 + CANCELLED = 0xFE FAILED = 0xFF - states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, FAILED] + states = [GENERATING, OUTBOUND, SENDING, SENT, DELIVERED, CANCELLED, FAILED] UNKNOWN = 0x00 PACKET = 0x01 @@ -564,22 +565,24 @@ class LXMessage: if resource.status == RNS.Resource.COMPLETE: self.__mark_delivered() else: - resource.link.teardown() - self.state = LXMessage.OUTBOUND + if self.state != LXMessage.CANCELLED: + resource.link.teardown() + self.state = LXMessage.OUTBOUND def __propagation_resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: self.__mark_propagated() else: - resource.link.teardown() - self.state = LXMessage.OUTBOUND + if self.state != LXMessage.CANCELLED: + resource.link.teardown() + self.state = LXMessage.OUTBOUND def __link_packet_timed_out(self, packet_receipt): - if packet_receipt: - packet_receipt.destination.teardown() - - self.state = LXMessage.OUTBOUND - + if self.state != LXMessage.CANCELLED: + if packet_receipt: + packet_receipt.destination.teardown() + + self.state = LXMessage.OUTBOUND def __update_transfer_progress(self, resource): self.progress = 0.10 + (resource.get_progress()*0.90)