#!/usr/bin/env python import argparse import http import json import mimetypes import os import time from typing import Callable import RNS import LXMF import asyncio import websockets import base64 class ReticulumWebChat: def __init__(self, webchat_config_file, reticulum_config_dir, identity: RNS.Identity): # default values before loading config self.display_name = "Anonymous Peer" # load config self.config_file = webchat_config_file or "storage/config.json" self.load_config() # init reticulum self.reticulum = RNS.Reticulum(reticulum_config_dir) self.identity = identity # init lxmf router self.message_router = LXMF.LXMRouter(identity=self.identity, storagepath="storage/lxmf") # register lxmf identity self.local_lxmf_destination = self.message_router.register_delivery_identity(self.identity) # set a callback for when an lxmf message is received self.message_router.register_delivery_callback(self.on_lxmf_delivery) # set a callback for when an lxmf announce is received RNS.Transport.register_announce_handler(LXMFAnnounceHandler(self.on_lxmf_announce_received)) # remember websocket clients self.websocket_clients = [] def load_config(self): # default config config = { } # attempt to load config and override default values try: with open(self.config_file, 'r') as f: custom_config = json.load(f) config |= custom_config # config is broken, fallback to defaults except: print("failed to load config, defaults will be used") # update display name from config if "display_name" in config: self.display_name = config["display_name"] # return loaded config return config def save_config(self): # build config config = { "display_name": self.display_name, } # attempt to save config try: with open(self.config_file, 'w') as f: json.dump(config, f, indent=4) # config is broken, fallback to defaults except: print("failed to save config") async def run(self, host, port): # start websocket server async with websockets.serve(self.on_websocket_client_connected, host, port, process_request=self.process_request): print("ReticulumWebChat server running at http://{}:{}".format(host, port)) await asyncio.Future() # run forever # handle serving custom http paths async def process_request(self, path, request_headers): # serve index.html if path == "/": with open("public/index.html") as f: file_content = f.read() return http.HTTPStatus.OK, [ ('Content-Type', 'text/html') ], file_content.encode("utf-8") # serve anything in public folder public_file_path = os.path.join("public", path.lstrip("/")) if os.path.isfile(public_file_path): mime_type, _ = mimetypes.guess_type(public_file_path) with open(public_file_path, "rb") as f: file_content = f.read() return http.HTTPStatus.OK, [ ('Content-Type', mime_type) ], file_content # by default, websocket is always served, but we only want it to be available at /ws # so we will return 404 for everything other than /ws if path != "/ws": return http.HTTPStatus.NOT_FOUND, [ ('Content-Type', 'text/html') ], b"Not Found" # handle new client connecting to websocket async def on_websocket_client_connected(self, client): # add client to connected clients list self.websocket_clients.append(client) # send config to all clients await self.send_config_to_websocket_clients() # send known peers to all clients await self.send_known_peers_to_websocket_clients() # handle client messages until disconnected while True: try: message = await client.recv() data = json.loads(message) await self.on_websocket_data_received(client, data) except (websockets.ConnectionClosed, websockets.ConnectionClosedOK, websockets.ConnectionClosedError): # client disconnected, we can stop looping break except Exception as e: # ignore errors while handling message print("failed to process client message") print(e) # loop finished, client is no longer connected self.websocket_clients.remove(client) # handle data received from websocket client async def on_websocket_data_received(self, client, data): # get type from client data _type = data["type"] # handle updating config if _type == "config.set": # send lxmf message to destination config = data["config"] # update display name in state if "display_name" in config and config["display_name"] != "": self.display_name = config["display_name"] print("updated display name to: " + self.display_name) # save config self.save_config() # send config to websocket clients await self.send_config_to_websocket_clients() # handle sending an lxmf message elif _type == "lxmf.delivery": # send lxmf message to destination destination_hash = data["destination_hash"] message = data["message"] await self.send_message(destination_hash, message) # # TODO: send response to client when marked as delivered? # await client.send(json.dumps({ # "type": "lxmf.sent", # })) # handle sending an announce elif _type == "announce": # send announce for lxmf self.local_lxmf_destination.announce(app_data=self.display_name.encode("utf-8")) # handle downloading a file from a nomadnet node elif _type == "nomadnet.file.download": # get data from websocket client destination_hash = data["nomadnet_file_download"]["destination_hash"] file_path = data["nomadnet_file_download"]["file_path"] # convert destination hash to bytes destination_hash = bytes.fromhex(destination_hash) # handle successful file download def on_file_download_success(file_name, file_bytes): asyncio.run(client.send(json.dumps({ "type": "nomadnet.file.download", "nomadnet_file_download": { "status": "success", "destination_hash": destination_hash.hex(), "file_path": file_path, "file_name": file_name, "file_bytes": base64.b64encode(file_bytes).decode("utf-8"), }, }))) # handle file download failure def on_file_download_failure(failure_reason): asyncio.run(client.send(json.dumps({ "type": "nomadnet.file.download", "nomadnet_file_download": { "status": "error", "failure_reason": failure_reason, "destination_hash": destination_hash.hex(), "file_path": file_path, }, }))) # todo: handle file download progress # download the file NomadnetFileDownloader(destination_hash, file_path, on_file_download_success, on_file_download_failure) # handle downloading a page from a nomadnet node elif _type == "nomadnet.page.download": # get data from websocket client destination_hash = data["nomadnet_page_download"]["destination_hash"] page_path = data["nomadnet_page_download"]["page_path"] # convert destination hash to bytes destination_hash = bytes.fromhex(destination_hash) # handle successful page download def on_page_download_success(page_content): asyncio.run(client.send(json.dumps({ "type": "nomadnet.page.download", "nomadnet_page_download": { "status": "success", "destination_hash": destination_hash.hex(), "page_path": page_path, "page_content": page_content, }, }))) # handle page download failure def on_page_download_failure(failure_reason): asyncio.run(client.send(json.dumps({ "type": "nomadnet.page.download", "nomadnet_page_download": { "status": "error", "failure_reason": failure_reason, "destination_hash": destination_hash.hex(), "page_path": page_path, }, }))) # todo: handle page download progress # download the page NomadnetPageDownloader(destination_hash, page_path, on_page_download_success, on_page_download_failure) # unhandled type else: print("unhandled client message type: " + _type) # broadcast provided data to all connected websocket clients async def websocket_broadcast(self, data): for websocket_client in self.websocket_clients: await websocket_client.send(data) # broadcasts config to all websocket clients async def send_config_to_websocket_clients(self): await self.websocket_broadcast(json.dumps({ "type": "config", "config": { "display_name": self.display_name, "identity_hash": self.identity.hexhash, "lxmf_address_hash": self.local_lxmf_destination.hexhash, }, })) # broadcasts known peers to all websocket clients async def send_known_peers_to_websocket_clients(self): # process known peers known_peers = [] for destination_hash in RNS.Identity.known_destinations: known_destination = RNS.Identity.known_destinations[destination_hash] last_announce_timestamp = known_destination[0] known_peers.append({ "destination_hash": destination_hash.hex(), "app_data": self.convert_app_data_to_string(RNS.Identity.recall_app_data(destination_hash)), "last_announce_timestamp": last_announce_timestamp, }) # send known peers to websocket clients await self.websocket_broadcast(json.dumps({ "type": "known_peers", "known_peers": known_peers, })) # convert app data to string, or return none unable to do so def convert_app_data_to_string(self, app_data): # attempt to convert to utf-8 string if app_data is not None: try: return app_data.decode("utf-8") except: # ignore failure to convert to string pass # unable to convert to string return None # convert an lxmf message to a dictionary, for sending over websocket def convert_lxmf_message_to_dict(self, lxmf_message: LXMF.LXMessage): # handle fields fields = {} message_fields = lxmf_message.get_fields() for field_type in message_fields: value = message_fields[field_type] # handle file attachments field if field_type == LXMF.FIELD_FILE_ATTACHMENTS: # process file attachments file_attachments = [] for file_attachment in value: file_name = file_attachment[0] file_bytes = base64.b64encode(file_attachment[1]).decode("utf-8") file_attachments.append({ "file_name": file_name, "file_bytes": file_bytes, }) # add to fields fields["file_attachments"] = file_attachments # handle image field if field_type == LXMF.FIELD_IMAGE: image_type = value[0] image_bytes = base64.b64encode(value[1]).decode("utf-8") fields["image"] = { "image_type": image_type, "image_bytes": image_bytes, } return { "hash": lxmf_message.hash.hex(), "source_hash": lxmf_message.source_hash.hex(), "destination_hash": lxmf_message.destination_hash.hex(), "state": self.convert_lxmf_state_to_string(lxmf_message), "progress": lxmf_message.progress, "content": lxmf_message.content.decode('utf-8'), "fields": fields, } # convert lxmf state to a human friendly string def convert_lxmf_state_to_string(self, lxmf_message: LXMF.LXMessage): # convert state to string lxmf_message_state = "unknown" if lxmf_message.state == LXMF.LXMessage.DRAFT: lxmf_message_state = "draft" elif lxmf_message.state == LXMF.LXMessage.OUTBOUND: lxmf_message_state = "outbound" elif lxmf_message.state == LXMF.LXMessage.SENDING: lxmf_message_state = "sending" elif lxmf_message.state == LXMF.LXMessage.SENT: lxmf_message_state = "sent" elif lxmf_message.state == LXMF.LXMessage.DELIVERED: lxmf_message_state = "delivered" elif lxmf_message.state == LXMF.LXMessage.FAILED: lxmf_message_state = "failed" return lxmf_message_state # handle an lxmf delivery from reticulum # NOTE: cant be async, as Reticulum doesn't await it def on_lxmf_delivery(self, message): try: # get message data message_content = message.content.decode('utf-8') source_hash_text = RNS.hexrep(message.source_hash, delimit=False) fields = {} message_fields = message.get_fields() for field_type in message_fields: value = message_fields[field_type] # handle file attachments field if field_type == LXMF.FIELD_FILE_ATTACHMENTS: # process file attachments file_attachments = [] for file_attachment in value: file_name = file_attachment[0] file_bytes = base64.b64encode(file_attachment[1]).decode("utf-8") file_attachments.append({ "file_name": file_name, "file_bytes": file_bytes, }) # add to fields fields["file_attachments"] = file_attachments # handle image field if field_type == LXMF.FIELD_IMAGE: image_type = value[0] image_bytes = base64.b64encode(value[1]).decode("utf-8") fields["image"] = { "image_type": image_type, "image_bytes": image_bytes, } # send received lxmf message data to all websocket clients asyncio.run(self.websocket_broadcast(json.dumps({ "type": "lxmf.delivery", "lxmf_message": self.convert_lxmf_message_to_dict(message), }))) except Exception as e: # do nothing on error print("lxmf_delivery error: {}".format(e)) # handle delivery status update for an outbound lxmf message def on_lxmf_sending_state_updated(self, lxmf_message): # send lxmf message state to all websocket clients asyncio.run(self.websocket_broadcast(json.dumps({ "type": "lxmf_message_state_updated", "lxmf_message": self.convert_lxmf_message_to_dict(lxmf_message), }))) # handle delivery failed for an outbound lxmf message def on_lxmf_sending_failed(self, lxmf_message): # just pass this on, we don't need to do anything special self.on_lxmf_sending_state_updated(lxmf_message) # handle sending an lxmf message to reticulum async def send_message(self, destination_hash, message_content): try: # convert destination hash to bytes destination_hash = bytes.fromhex(destination_hash) # FIXME: can this be removed, and just rely on the router to check paths? # find destination identity from hash destination_identity = RNS.Identity.recall(destination_hash) if destination_identity is None: # we don't know the path/identity for this destination hash, we will request it RNS.Transport.request_path(destination_hash) # we have to bail out of sending, since we don't have the path yet return # create destination for recipients lxmf delivery address lxmf_destination = RNS.Destination(destination_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery") # create lxmf message lxmf_message = LXMF.LXMessage(lxmf_destination, self.local_lxmf_destination, message_content, desired_method=LXMF.LXMessage.DIRECT) lxmf_message.try_propagation_on_fail = True lxmf_message.register_delivery_callback(self.on_lxmf_sending_state_updated) lxmf_message.register_failed_callback(self.on_lxmf_sending_failed) # send lxmf message to be routed to destination self.message_router.handle_outbound(lxmf_message) # send outbound lxmf message to websocket (after passing to router so hash is available) await self.websocket_broadcast(json.dumps({ "type": "lxmf_outbound_message_created", "lxmf_message": self.convert_lxmf_message_to_dict(lxmf_message), })) except: # FIXME send error to websocket? print("failed to send lxmf message") # handle an announce received from reticulum, for an lxmf address # NOTE: cant be async, as Reticulum doesn't await it def on_lxmf_announce_received(self, destination_hash, announced_identity, app_data): # log received announce RNS.log("Received an announce from " + RNS.prettyhexrep(destination_hash)) # parse app data parsed_app_data = None if app_data is not None: parsed_app_data = app_data.decode("utf-8") # send received lxmf announce to all websocket clients asyncio.run(self.websocket_broadcast(json.dumps({ "type": "announce", "announce": { "destination_hash": destination_hash.hex(), "app_data": parsed_app_data, "last_announce_timestamp": time.time(), }, }))) # an announce handler for lxmf.delivery aspect that just forwards to a provided callback class LXMFAnnounceHandler: def __init__(self, received_announce_callback): self.aspect_filter = "lxmf.delivery" self.received_announce_callback = received_announce_callback # we will just pass the received announce back to the provided callback def received_announce(self, destination_hash, announced_identity, app_data): try: # handle received announce self.received_announce_callback(destination_hash, announced_identity, app_data) except: # ignore failure to handle received announce pass class NomadnetDownloader: def __init__(self, destination_hash: bytes, path: str, on_download_success: Callable[[bytes], None], on_download_failure: Callable[[str], None], timeout: int|None = None, auto_download=True): self.app_name = "nomadnetwork" self.aspects = "node" self.destination_hash = destination_hash self.path = path self.timeout = timeout self.on_download_success = on_download_success self.on_download_failure = on_download_failure if auto_download: self.download() # setup link to destination and request download def download(self): # request path to destination RNS.Transport.request_path(self.destination_hash) # find existing identity identity = RNS.Identity.recall(self.destination_hash) if identity is None: self.on_download_failure("identity not found") return # create destination to nomadnet node destination = RNS.Destination( identity, RNS.Destination.OUT, RNS.Destination.SINGLE, self.app_name, self.aspects, ) # create link to destination RNS.Link(destination, established_callback=self.link_established) # link to destination was established, we should now request the download def link_established(self, link): # request download over link link.request( self.path, data=None, response_callback=self.on_response, failed_callback=self.on_failed, progress_callback=self.on_progress, timeout=self.timeout, ) # handle successful download def on_response(self, request_receipt): print("file_received") self.on_download_success(request_receipt.response) # handle failure def on_failed(self, request_receipt=None): self.on_download_failure("request_failed") # handle download progress def on_progress(self, request_receipt): print("response_progressed") print(request_receipt) class NomadnetPageDownloader(NomadnetDownloader): def __init__(self, destination_hash: bytes, page_path: str, on_page_download_success: Callable[[str], None], on_page_download_failure: Callable[[str], None], timeout: int|None = None, auto_download=True): self.on_page_download_success = on_page_download_success self.on_page_download_failure = on_page_download_failure super().__init__(destination_hash, page_path, self.on_download_success, self.on_download_failure, timeout, auto_download) # page download was successful, decode the response and send to provided callback def on_download_success(self, response_bytes): micron_markup_response = response_bytes.decode("utf-8") self.on_page_download_success(micron_markup_response) # page download failed, send error to provided callback def on_download_failure(self, failure_reason): self.on_page_download_failure(failure_reason) class NomadnetFileDownloader(NomadnetDownloader): def __init__(self, destination_hash: bytes, page_path: str, on_file_download_success: Callable[[str, bytes], None], on_file_download_failure: Callable[[str], None], timeout: int|None = None, auto_download=True): self.on_file_download_success = on_file_download_success self.on_file_download_failure = on_file_download_failure super().__init__(destination_hash, page_path, self.on_download_success, self.on_download_failure, timeout, auto_download) # file download was successful, decode the response and send to provided callback def on_download_success(self, response): file_name: str = response[0] file_data: bytes = response[1] self.on_file_download_success(file_name, file_data) # page download failed, send error to provided callback def on_download_failure(self, failure_reason): self.on_file_download_failure(failure_reason) async def main(): # parse command line args parser = argparse.ArgumentParser(description="ReticulumWebChat") parser.add_argument("--host", nargs='?', default="0.0.0.0", type=str, help="The address the web server should listen on.") parser.add_argument("--port", nargs='?', default="8000", type=int, help="The port the web server should listen on.") parser.add_argument("--identity-file", type=str, help="Path to a Reticulum Identity file to use as your LXMF address.") parser.add_argument("--identity-base64", type=str, help="A base64 encoded Reticulum Identity to use as your LXMF address.") parser.add_argument("--generate-identity-file", type=str, help="Generates and saves a new Reticulum Identity to the provided file path and then exits.") parser.add_argument("--generate-identity-base64", action='store_true', help="Outputs a randomly generated Reticulum Identity as base64 and then exits.") parser.add_argument("--reticulum-config-dir", type=str, help="Path to a Reticulum config directory for the RNS stack to use (e.g: ~/.reticulum)") parser.add_argument("--webchat-config-file", type=str, help="Path to a ReticulumWebChat config file for saving user preferences.") args = parser.parse_args() # util to generate reticulum identity and save to file without using rnid if args.generate_identity_file is not None: # do not overwrite existing files, otherwise user could lose existing keys if os.path.exists(args.generate_identity_file): print("DANGER: the provided identity file path already exists, not overwriting!") return # generate a new identity and save to provided file path identity = RNS.Identity(create_keys=True) with open(args.generate_identity_file, "wb") as file: file.write(identity.get_private_key()) print("A new Reticulum Identity has been saved to: {}".format(args.generate_identity_file)) return # util to generate reticulum identity as base64 without using rnid if args.generate_identity_base64 is True: identity = RNS.Identity(create_keys=True) print(base64.b64encode(identity.get_private_key()).decode("utf-8")) return # use provided identity, or fallback to a random one if args.identity_file is not None: identity = RNS.Identity(create_keys=False) identity.load(args.identity_file) print("Reticulum Identity has been loaded from file.") print(identity) elif args.identity_base64 is not None: identity = RNS.Identity(create_keys=False) identity.load_private_key(base64.b64decode(args.identity_base64)) print("Reticulum Identity has been loaded from base64.") print(identity) else: identity = RNS.Identity(create_keys=True) print("Reticulum Identity has been randomly generated.") print(identity) # init app reticulum_webchat = ReticulumWebChat(args.webchat_config_file, args.reticulum_config_dir, identity) await reticulum_webchat.run(args.host, args.port) if __name__ == "__main__": asyncio.run(main())