From af36d30fb4a48db1102fef23aef88090a8ac3f68 Mon Sep 17 00:00:00 2001 From: liamcottle Date: Wed, 18 Sep 2024 01:26:40 +1200 Subject: [PATCH] implement logic to send failed messages via propagation node --- database.py | 9 ++++++++- meshchat.py | 45 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/database.py b/database.py index 1df9204..b7880b4 100644 --- a/database.py +++ b/database.py @@ -3,7 +3,7 @@ from datetime import datetime, timezone from peewee import * from playhouse.migrate import migrate as migrate_database, SqliteMigrator -latest_version = 3 # increment each time new database migrations are added +latest_version = 4 # increment each time new database migrations are added database = DatabaseProxy() # use a proxy object, as we will init real db client inside meshchat.py migrator = SqliteMigrator(database) @@ -26,6 +26,12 @@ def migrate(current_version): migrator.add_column("lxmf_messages", 'quality', LxmfMessage.quality), ) + # migrate to version 4 + if current_version < 4: + migrate_database( + migrator.add_column("lxmf_messages", 'method', LxmfMessage.method), + ) + return latest_version @@ -73,6 +79,7 @@ class LxmfMessage(BaseModel): state = CharField() # state is converted from internal int to a human friendly string progress = FloatField() # progress is converted from internal float 0.00-1.00 to float between 0.00/100 (2 decimal places) is_incoming = BooleanField() # if true, we should ignore state, it's set to draft by default on incoming messages + method = CharField(null=True) # what method is being used to send the message, e.g: direct, propagated delivery_attempts = IntegerField(default=0) # how many times delivery has been attempted for this message next_delivery_attempt_at = FloatField(null=True) # timestamp of when the message will attempt delivery again title = TextField() diff --git a/meshchat.py b/meshchat.py index 74e6071..807a43a 100644 --- a/meshchat.py +++ b/meshchat.py @@ -1474,6 +1474,7 @@ class ReticulumMeshChat: "is_incoming": lxmf_message.incoming, "state": self.convert_lxmf_state_to_string(lxmf_message), "progress": progress_percentage, + "method": self.convert_lxmf_method_to_string(lxmf_message), "delivery_attempts": lxmf_message.delivery_attempts, "next_delivery_attempt_at": getattr(lxmf_message, "next_delivery_attempt", None), # attribute may not exist yet "title": lxmf_message.title.decode('utf-8'), @@ -1505,6 +1506,22 @@ class ReticulumMeshChat: return lxmf_message_state + # convert lxmf method to a human friendly string + def convert_lxmf_method_to_string(self, lxmf_message: LXMF.LXMessage): + + # convert method to string + lxmf_message_method = "unknown" + if lxmf_message.method == LXMF.LXMessage.OPPORTUNISTIC: + lxmf_message_method = "opportunistic" + elif lxmf_message.method == LXMF.LXMessage.DIRECT: + lxmf_message_method = "direct" + elif lxmf_message.method == LXMF.LXMessage.PROPAGATED: + lxmf_message_method = "propagated" + elif lxmf_message.method == LXMF.LXMessage.PAPER: + lxmf_message_method = "paper" + + return lxmf_message_method + # convert database announce to a dictionary def convert_db_announce_to_dict(self, announce: database.Announce): @@ -1559,9 +1576,30 @@ class ReticulumMeshChat: # handle delivery failed for an outbound lxmf message def on_lxmf_sending_failed(self, lxmf_message): - # just pass this on, we don't need to do anything special + + # check if this failed message should fall back to sending via a propagation node + if lxmf_message.state == LXMF.LXMessage.FAILED and hasattr(lxmf_message, "try_propagation_on_fail") and lxmf_message.try_propagation_on_fail: + self.send_failed_message_via_propagation_node(lxmf_message) + + # update state self.on_lxmf_sending_state_updated(lxmf_message) + # sends a previously failed message via a propagation node + def send_failed_message_via_propagation_node(self, lxmf_message: LXMF.LXMessage): + + # reset internal message state + lxmf_message.packed = None + lxmf_message.delivery_attempts = 0 + if hasattr(lxmf_message, "next_delivery_attempt"): + del lxmf_message.next_delivery_attempt + + # this message should now be sent via a propagation node + lxmf_message.desired_method = LXMF.LXMessage.PROPAGATED + lxmf_message.try_propagation_on_fail = False + + # resend message + self.message_router.handle_outbound(lxmf_message) + # upserts the provided lxmf message to the database def db_upsert_lxmf_message(self, lxmf_message: LXMF.LXMessage): @@ -1576,6 +1614,7 @@ class ReticulumMeshChat: "is_incoming": lxmf_message_dict["is_incoming"], "state": lxmf_message_dict["state"], "progress": lxmf_message_dict["progress"], + "method": lxmf_message_dict["method"], "delivery_attempts": lxmf_message_dict["delivery_attempts"], "next_delivery_attempt_at": lxmf_message_dict["next_delivery_attempt_at"], "title": lxmf_message_dict["title"], @@ -1719,9 +1758,9 @@ class ReticulumMeshChat: # updates lxmf message in database and broadcasts to websocket until it's delivered, or it fails async def handle_lxmf_message_progress(self, lxmf_message): - # FIXME: there's no register_progress_callback on the lxmf message, so manually send progress until delivered or failed + # FIXME: there's no register_progress_callback on the lxmf message, so manually send progress until delivered, sent or failed # we also can't use on_lxmf_sending_state_updated method to do this, because of async/await issues... - while lxmf_message.state != LXMF.LXMessage.DELIVERED and lxmf_message.state != LXMF.LXMessage.FAILED: + while lxmf_message.state != LXMF.LXMessage.DELIVERED and lxmf_message.state != LXMF.LXMessage.SENT and lxmf_message.state != LXMF.LXMessage.FAILED: # wait 1 second between sending updates await asyncio.sleep(1)