implement logic to send failed messages via propagation node

This commit is contained in:
liamcottle 2024-09-18 01:26:40 +12:00
commit af36d30fb4
2 changed files with 50 additions and 4 deletions

View file

@ -3,7 +3,7 @@ from datetime import datetime, timezone
from peewee import *
from playhouse.migrate import migrate as migrate_database, SqliteMigrator
latest_version = 3 # increment each time new database migrations are added
latest_version = 4 # increment each time new database migrations are added
database = DatabaseProxy() # use a proxy object, as we will init real db client inside meshchat.py
migrator = SqliteMigrator(database)
@ -26,6 +26,12 @@ def migrate(current_version):
migrator.add_column("lxmf_messages", 'quality', LxmfMessage.quality),
)
# migrate to version 4
if current_version < 4:
migrate_database(
migrator.add_column("lxmf_messages", 'method', LxmfMessage.method),
)
return latest_version
@ -73,6 +79,7 @@ class LxmfMessage(BaseModel):
state = CharField() # state is converted from internal int to a human friendly string
progress = FloatField() # progress is converted from internal float 0.00-1.00 to float between 0.00/100 (2 decimal places)
is_incoming = BooleanField() # if true, we should ignore state, it's set to draft by default on incoming messages
method = CharField(null=True) # what method is being used to send the message, e.g: direct, propagated
delivery_attempts = IntegerField(default=0) # how many times delivery has been attempted for this message
next_delivery_attempt_at = FloatField(null=True) # timestamp of when the message will attempt delivery again
title = TextField()

View file

@ -1474,6 +1474,7 @@ class ReticulumMeshChat:
"is_incoming": lxmf_message.incoming,
"state": self.convert_lxmf_state_to_string(lxmf_message),
"progress": progress_percentage,
"method": self.convert_lxmf_method_to_string(lxmf_message),
"delivery_attempts": lxmf_message.delivery_attempts,
"next_delivery_attempt_at": getattr(lxmf_message, "next_delivery_attempt", None), # attribute may not exist yet
"title": lxmf_message.title.decode('utf-8'),
@ -1505,6 +1506,22 @@ class ReticulumMeshChat:
return lxmf_message_state
# convert lxmf method to a human friendly string
def convert_lxmf_method_to_string(self, lxmf_message: LXMF.LXMessage):
# convert method to string
lxmf_message_method = "unknown"
if lxmf_message.method == LXMF.LXMessage.OPPORTUNISTIC:
lxmf_message_method = "opportunistic"
elif lxmf_message.method == LXMF.LXMessage.DIRECT:
lxmf_message_method = "direct"
elif lxmf_message.method == LXMF.LXMessage.PROPAGATED:
lxmf_message_method = "propagated"
elif lxmf_message.method == LXMF.LXMessage.PAPER:
lxmf_message_method = "paper"
return lxmf_message_method
# convert database announce to a dictionary
def convert_db_announce_to_dict(self, announce: database.Announce):
@ -1559,9 +1576,30 @@ class ReticulumMeshChat:
# handle delivery failed for an outbound lxmf message
def on_lxmf_sending_failed(self, lxmf_message):
# just pass this on, we don't need to do anything special
# check if this failed message should fall back to sending via a propagation node
if lxmf_message.state == LXMF.LXMessage.FAILED and hasattr(lxmf_message, "try_propagation_on_fail") and lxmf_message.try_propagation_on_fail:
self.send_failed_message_via_propagation_node(lxmf_message)
# update state
self.on_lxmf_sending_state_updated(lxmf_message)
# sends a previously failed message via a propagation node
def send_failed_message_via_propagation_node(self, lxmf_message: LXMF.LXMessage):
# reset internal message state
lxmf_message.packed = None
lxmf_message.delivery_attempts = 0
if hasattr(lxmf_message, "next_delivery_attempt"):
del lxmf_message.next_delivery_attempt
# this message should now be sent via a propagation node
lxmf_message.desired_method = LXMF.LXMessage.PROPAGATED
lxmf_message.try_propagation_on_fail = False
# resend message
self.message_router.handle_outbound(lxmf_message)
# upserts the provided lxmf message to the database
def db_upsert_lxmf_message(self, lxmf_message: LXMF.LXMessage):
@ -1576,6 +1614,7 @@ class ReticulumMeshChat:
"is_incoming": lxmf_message_dict["is_incoming"],
"state": lxmf_message_dict["state"],
"progress": lxmf_message_dict["progress"],
"method": lxmf_message_dict["method"],
"delivery_attempts": lxmf_message_dict["delivery_attempts"],
"next_delivery_attempt_at": lxmf_message_dict["next_delivery_attempt_at"],
"title": lxmf_message_dict["title"],
@ -1719,9 +1758,9 @@ class ReticulumMeshChat:
# updates lxmf message in database and broadcasts to websocket until it's delivered, or it fails
async def handle_lxmf_message_progress(self, lxmf_message):
# FIXME: there's no register_progress_callback on the lxmf message, so manually send progress until delivered or failed
# FIXME: there's no register_progress_callback on the lxmf message, so manually send progress until delivered, sent or failed
# we also can't use on_lxmf_sending_state_updated method to do this, because of async/await issues...
while lxmf_message.state != LXMF.LXMessage.DELIVERED and lxmf_message.state != LXMF.LXMessage.FAILED:
while lxmf_message.state != LXMF.LXMessage.DELIVERED and lxmf_message.state != LXMF.LXMessage.SENT and lxmf_message.state != LXMF.LXMessage.FAILED:
# wait 1 second between sending updates
await asyncio.sleep(1)