migrate sending messages to an http api instead of websocket to improve error handling

This commit is contained in:
liamcottle 2024-07-08 23:54:51 +12:00
commit a4971c5ccd
2 changed files with 145 additions and 147 deletions

View file

@ -1424,21 +1424,6 @@
break;
}
case 'lxmf_outbound_message_created': {
// add outbound message to ui
this.chatItems.push({
"type": "lxmf_message",
"lxmf_message": json.lxmf_message,
"is_outbound": true,
});
// always scroll to bottom since we just sent a message
this.scrollMessagesToBottom();
break;
}
case 'lxmf_message_state_updated': {
@ -1593,12 +1578,6 @@
return;
}
// do nothing if not connected to websocket
if(!this.isWebsocketConnected){
this.alert("Not connected to WebSocket!");
return;
}
// do nothing if no peer selected
if(!this.selectedPeer){
return;
@ -1642,15 +1621,24 @@
};
}
// send message to reticulum via websocket
this.ws.send(JSON.stringify({
"type": "lxmf.delivery",
// send message to reticulum
const response = await window.axios.post(`/api/v1/lxmf-messages/send`, {
"lxmf_message": {
"destination_hash": this.selectedPeer.destination_hash,
"content": this.newMessageText,
"fields": fields,
},
}));
});
// add outbound message to ui
this.chatItems.push({
"type": "lxmf_message",
"lxmf_message": response.data.lxmf_message,
"is_outbound": true,
});
// always scroll to bottom since we just sent a message
this.scrollMessagesToBottom();
// clear message inputs
this.newMessageText = "";
@ -1663,16 +1651,15 @@
} catch(e) {
// todo handle error
console.error(e);
// show error
const message = e.response?.data?.message ?? "failed to send message";
this.alert(message);
console.log(e);
} finally {
this.isSendingMessage = false;
}
// scroll to bottom
this.scrollMessagesToBottom();
},
async retrySendingMessage(chatItem) {
@ -1687,22 +1674,34 @@
try {
// send message to reticulum via websocket
this.ws.send(JSON.stringify({
"type": "lxmf.delivery",
"lxmf_message": chatItem.lxmf_message,
}));
// send message to reticulum
const response = await window.axios.post(`/api/v1/lxmf-messages/send`, {
"lxmf_message": {
"destination_hash": chatItem.lxmf_message.destination_hash,
"content": chatItem.lxmf_message.content,
"fields": chatItem.lxmf_message.fields,
},
});
// add outbound message to ui
this.chatItems.push({
"type": "lxmf_message",
"lxmf_message": response.data.lxmf_message,
"is_outbound": true,
});
// always scroll to bottom since we just sent a message
this.scrollMessagesToBottom();
} catch(e) {
// todo handle error
console.error(e);
// show error
const message = e.response?.data?.message ?? "failed to send message";
this.alert(message);
console.log(e);
}
// scroll to bottom
this.scrollMessagesToBottom();
},
async updateConfig(config) {

213
web.py
View file

@ -796,6 +796,63 @@ class ReticulumMeshChat:
"path_table": path_table,
})
# send lxmf message
@routes.post("/api/v1/lxmf-messages/send")
async def index(request):
# get request body as json
data = await request.json()
# get data from json
destination_hash = data["lxmf_message"]["destination_hash"]
content = data["lxmf_message"]["content"]
fields = {}
if "fields" in data["lxmf_message"]:
fields = data["lxmf_message"]["fields"]
# parse image field
image_field = None
if "image" in fields:
image_type = data["lxmf_message"]["fields"]["image"]["image_type"]
image_bytes = base64.b64decode(data["lxmf_message"]["fields"]["image"]["image_bytes"])
image_field = LxmfImageField(image_type, image_bytes)
# parse audio field
audio_field = None
if "audio" in fields:
audio_mode = data["lxmf_message"]["fields"]["audio"]["audio_mode"]
audio_bytes = base64.b64decode(data["lxmf_message"]["fields"]["audio"]["audio_bytes"])
audio_field = LxmfAudioField(audio_mode, audio_bytes)
# parse file attachments field
file_attachments_field = None
if "file_attachments" in fields:
file_attachments = []
for file_attachment in data["lxmf_message"]["fields"]["file_attachments"]:
file_name = file_attachment["file_name"]
file_bytes = base64.b64decode(file_attachment["file_bytes"])
file_attachments.append(LxmfFileAttachment(file_name, file_bytes))
file_attachments_field = LxmfFileAttachmentsField(file_attachments)
try:
# send lxmf message to destination
lxmf_message = await self.send_message(destination_hash, content,
image_field=image_field,
audio_field=audio_field,
file_attachments_field=file_attachments_field)
return web.json_response({
"lxmf_message": self.convert_lxmf_message_to_dict(lxmf_message),
})
except Exception as e:
return web.json_response({
"message": "Sending Failed: {}".format(str(e)),
}, status=503)
# delete lxmf message
@routes.delete("/api/v1/lxmf-messages/{hash}")
async def index(request):
@ -1011,53 +1068,6 @@ class ReticulumMeshChat:
# send config to websocket clients
await self.send_config_to_websocket_clients()
# handle sending an lxmf message
elif _type == "lxmf.delivery":
# get data from websocket client
destination_hash = data["lxmf_message"]["destination_hash"]
content = data["lxmf_message"]["content"]
fields = {}
if "fields" in data["lxmf_message"]:
fields = data["lxmf_message"]["fields"]
# parse image field
image_field = None
if "image" in fields:
image_type = data["lxmf_message"]["fields"]["image"]["image_type"]
image_bytes = base64.b64decode(data["lxmf_message"]["fields"]["image"]["image_bytes"])
image_field = LxmfImageField(image_type, image_bytes)
# parse audio field
audio_field = None
if "audio" in fields:
audio_mode = data["lxmf_message"]["fields"]["audio"]["audio_mode"]
audio_bytes = base64.b64decode(data["lxmf_message"]["fields"]["audio"]["audio_bytes"])
audio_field = LxmfAudioField(audio_mode, audio_bytes)
# parse file attachments field
file_attachments_field = None
if "file_attachments" in fields:
file_attachments = []
for file_attachment in data["lxmf_message"]["fields"]["file_attachments"]:
file_name = file_attachment["file_name"]
file_bytes = base64.b64decode(file_attachment["file_bytes"])
file_attachments.append(LxmfFileAttachment(file_name, file_bytes))
file_attachments_field = LxmfFileAttachmentsField(file_attachments)
# send lxmf message to destination
await self.send_message(destination_hash, content,
image_field=image_field,
audio_field=audio_field,
file_attachments_field=file_attachments_field)
# # TODO: send response to client when marked as delivered?
# await client.send(json.dumps({
# "type": "lxmf.sent",
# }))
# handle sending an announce
elif _type == "announce":
await self.announce()
@ -1463,83 +1473,72 @@ class ReticulumMeshChat:
async def send_message(self, destination_hash, content: str,
image_field: LxmfImageField = None,
audio_field: LxmfAudioField = None,
file_attachments_field: LxmfFileAttachmentsField = None):
file_attachments_field: LxmfFileAttachmentsField = None) -> LXMF.LXMessage:
try:
# convert destination hash to bytes
destination_hash = bytes.fromhex(destination_hash)
# convert destination hash to bytes
destination_hash = bytes.fromhex(destination_hash)
# FIXME: can this be removed, and just rely on the router to check paths?
# find destination identity from hash
destination_identity = RNS.Identity.recall(destination_hash)
if destination_identity is None:
# FIXME: can this be removed, and just rely on the router to check paths?
# find destination identity from hash
destination_identity = RNS.Identity.recall(destination_hash)
if destination_identity is None:
# we don't know the path/identity for this destination hash, we will request it
RNS.Transport.request_path(destination_hash)
# we don't know the path/identity for this destination hash, we will request it
RNS.Transport.request_path(destination_hash)
# we have to bail out of sending, since we don't have the path yet
raise Exception("Destination identity is not known. Try again later.")
# we have to bail out of sending, since we don't have the path yet
# FIXME: we just ate the message, and didn't tell the user it failed...
return
# create destination for recipients lxmf delivery address
lxmf_destination = RNS.Destination(destination_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
# create destination for recipients lxmf delivery address
lxmf_destination = RNS.Destination(destination_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
# create lxmf message
lxmf_message = LXMF.LXMessage(lxmf_destination, self.local_lxmf_destination, content, desired_method=LXMF.LXMessage.DIRECT)
lxmf_message.try_propagation_on_fail = True
# create lxmf message
lxmf_message = LXMF.LXMessage(lxmf_destination, self.local_lxmf_destination, content, desired_method=LXMF.LXMessage.DIRECT)
lxmf_message.try_propagation_on_fail = True
lxmf_message.fields = {}
lxmf_message.fields = {}
# add file attachments field
if file_attachments_field is not None:
# add file attachments field
if file_attachments_field is not None:
# create array of [[file_name, file_bytes], [file_name, file_bytes], ...]
file_attachments = []
for file_attachment in file_attachments_field.file_attachments:
file_attachments.append([file_attachment.file_name, file_attachment.file_bytes])
# create array of [[file_name, file_bytes], [file_name, file_bytes], ...]
file_attachments = []
for file_attachment in file_attachments_field.file_attachments:
file_attachments.append([file_attachment.file_name, file_attachment.file_bytes])
# set field attachments field
lxmf_message.fields[LXMF.FIELD_FILE_ATTACHMENTS] = file_attachments
# set field attachments field
lxmf_message.fields[LXMF.FIELD_FILE_ATTACHMENTS] = file_attachments
# add image field
if image_field is not None:
lxmf_message.fields[LXMF.FIELD_IMAGE] = [
image_field.image_type,
image_field.image_bytes,
]
# add image field
if image_field is not None:
lxmf_message.fields[LXMF.FIELD_IMAGE] = [
image_field.image_type,
image_field.image_bytes,
]
# add audio field
if audio_field is not None:
lxmf_message.fields[LXMF.FIELD_AUDIO] = [
audio_field.audio_mode,
audio_field.audio_bytes,
]
# add audio field
if audio_field is not None:
lxmf_message.fields[LXMF.FIELD_AUDIO] = [
audio_field.audio_mode,
audio_field.audio_bytes,
]
# register delivery callbacks
lxmf_message.register_delivery_callback(self.on_lxmf_sending_state_updated)
lxmf_message.register_failed_callback(self.on_lxmf_sending_failed)
# register delivery callbacks
lxmf_message.register_delivery_callback(self.on_lxmf_sending_state_updated)
lxmf_message.register_failed_callback(self.on_lxmf_sending_failed)
# send lxmf message to be routed to destination
self.message_router.handle_outbound(lxmf_message)
# send lxmf message to be routed to destination
self.message_router.handle_outbound(lxmf_message)
# upsert lxmf message to database
self.db_upsert_lxmf_message(lxmf_message)
# upsert lxmf message to database
self.db_upsert_lxmf_message(lxmf_message)
# handle lxmf message progress loop without blocking or awaiting
# otherwise other incoming websocket packets will not be processed until sending is complete
# which results in the next message not showing up until the first message is finished
asyncio.create_task(self.handle_lxmf_message_progress(lxmf_message))
# send outbound lxmf message to websocket (after passing to router so hash is available)
await self.websocket_broadcast(json.dumps({
"type": "lxmf_outbound_message_created",
"lxmf_message": self.convert_lxmf_message_to_dict(lxmf_message),
}))
# handle lxmf message progress loop without blocking or awaiting
# otherwise other incoming websocket packets will not be processed until sending is complete
# which results in the next message not showing up until the first message is finished
asyncio.create_task(self.handle_lxmf_message_progress(lxmf_message))
except:
# FIXME send error to websocket?
print("failed to send lxmf message")
return lxmf_message
# updates lxmf message in database and broadcasts to websocket until it's delivered, or it fails
async def handle_lxmf_message_progress(self, lxmf_message):