From 93b6104aef7f070a6c2e850384da400ee1d65ff1 Mon Sep 17 00:00:00 2001 From: liamcottle Date: Tue, 21 Jan 2025 03:52:47 +1300 Subject: [PATCH] initial implementation of a WebsocketClientInterface and a WebsocketServerInterface for RNS --- .../interfaces/WebsocketClientInterface.py | 109 ++++++++++++++ .../interfaces/WebsocketServerInterface.py | 133 ++++++++++++++++++ 2 files changed, 242 insertions(+) create mode 100644 src/backend/interfaces/WebsocketClientInterface.py create mode 100644 src/backend/interfaces/WebsocketServerInterface.py diff --git a/src/backend/interfaces/WebsocketClientInterface.py b/src/backend/interfaces/WebsocketClientInterface.py new file mode 100644 index 0000000..a423cbe --- /dev/null +++ b/src/backend/interfaces/WebsocketClientInterface.py @@ -0,0 +1,109 @@ +import asyncio +import threading + +import RNS +import websockets +from RNS.Interfaces.Interface import Interface +from websockets.asyncio.connection import Connection + +from src.backend.async_utils import AsyncUtils + + +class WebsocketClientInterface(Interface): + + # TODO: required? + DEFAULT_IFAC_SIZE = 16 + + def __str__(self): + return f"WebsocketClientInterface[{self.name}/{self.target_host}:{self.target_port}]" + + def __init__(self, owner, configuration, websocket: Connection = None): + + super().__init__() + + self.owner = owner + + self.IN = True + self.OUT = False + self.HW_MTU = 262144 # 256KiB + self.bitrate = 1_000_000_000 # 1Gbps + self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL + + # parse config + ifconf = Interface.get_config_obj(configuration) + self.name = ifconf.get("name") + self.target_host = ifconf.get("target_host", None) + self.target_port = ifconf.get("target_port", None) + + # ensure target host is provided + if self.target_host is None: + raise SystemError(f"target_host is required for interface '{self.name}'") + + # ensure target port is provided + if self.target_port is None: + raise SystemError(f"target_port is required for interface '{self.name}'") + + # convert target port to int + self.target_port = int(self.target_port) + + # connect to websocket server if an existing connection was not provided + self.websocket = websocket + if self.websocket is None: + thread = threading.Thread(target=asyncio.run, args=(self.connect(),)) + thread.daemon = True + thread.start() + + # called when a full packet has been received over the websocket + def process_incoming(self, data): + + print(f"{self} process_incoming: {data.hex()}") + + # update received bytes counter + self.rxb += len(data) + + # send received data to transport instance + self.owner.inbound(data, self) + + # the running reticulum transport instance will call this method whenever the interface must transmit a packet + def process_outgoing(self, data): + + # do nothing if not online + if not self.online: + return + + # send to websocket server + print(f"{self} process_outgoing: {data.hex()}") + AsyncUtils.run_async(self.websocket.send(data)) + + # update sent bytes counter + self.txb += len(data) + + # connect to the configured websocket server + async def connect(self): + + try: + # todo: ws:// and wss:// support in config file? + async with websockets.connect(f"ws://{self.target_host}:{self.target_port}", max_size=None, compression=None) as websocket: + self.websocket = websocket + await self.read_loop() + except Exception as e: + RNS.log(f"{self} failed with error: {e}", RNS.LOG_ERROR) + + # todo implement reconnect delay + await self.connect() + + async def read_loop(self): + + self.online = True + + try: + async for message in self.websocket: + self.process_incoming(message) + except Exception as e: + RNS.log(f"{self} read loop error: {e}", RNS.LOG_ERROR) + + self.online = False + + +# set interface class RNS should use when importing this external interface +interface_class = WebsocketClientInterface diff --git a/src/backend/interfaces/WebsocketServerInterface.py b/src/backend/interfaces/WebsocketServerInterface.py new file mode 100644 index 0000000..14ce4f8 --- /dev/null +++ b/src/backend/interfaces/WebsocketServerInterface.py @@ -0,0 +1,133 @@ +import asyncio +import threading +import time + +import RNS +import websockets +from RNS.Interfaces.Interface import Interface +from websockets import ServerConnection + +from src.backend.interfaces.WebsocketClientInterface import WebsocketClientInterface + + +class WebsocketServerInterface(Interface): + + # TODO: required? + DEFAULT_IFAC_SIZE = 16 + + def __str__(self): + return f"WebsocketServerInterface[{self.name}/{self.listen_ip}:{self.listen_port}]" + + def __init__(self, owner, configuration): + + super().__init__() + + self.owner = owner + + self.IN = True + self.OUT = False + self.HW_MTU = 262144 # 256KiB + self.bitrate = 1_000_000_000 # 1Gbps + self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL + self.spawned_interfaces = [] + + # parse config + ifconf = Interface.get_config_obj(configuration) + self.name = ifconf.get("name") + self.listen_ip = ifconf.get("listen_ip", None) + self.listen_port = ifconf.get("listen_port", None) + + # ensure listen ip is provided + if self.listen_ip is None: + raise SystemError(f"listen_ip is required for interface '{self.name}'") + + # ensure listen port is provided + if self.listen_port is None: + raise SystemError(f"listen_port is required for interface '{self.name}'") + + # convert listen port to int + self.listen_port = int(self.listen_port) + + # run websocket server + thread = threading.Thread(target=asyncio.run, args=(self.serve(),)) + thread.daemon = True + thread.start() + + # todo docs + def received_announce(self, from_spawned=False): + if from_spawned: + self.ia_freq_deque.append(time.time()) + + # todo docs + def sent_announce(self, from_spawned=False): + if from_spawned: + self.oa_freq_deque.append(time.time()) + + # called when a full packet has been received from a websocket client + def process_incoming(self, data): + + print(f"{self} process_incoming: {data.hex()}") + + # Update our received bytes counter + self.rxb += len(data) + + # And send the data packet to the Transport instance for processing. + self.owner.inbound(data, self) + + # do nothing as the spawned child interface will take care of rx/tx + def process_outgoing(self, data): + pass + + async def serve(self): + + # handle new websocket client connections + async def on_websocket_client_connected(websocket: ServerConnection): + + # create new child interface + RNS.log("Accepting incoming WebSocket connection", RNS.LOG_VERBOSE) + spawned_interface = WebsocketClientInterface(self.owner, { + "name": f"Client on {self.name}", + "target_host": websocket.remote_address[0], + "target_port": str(websocket.remote_address[1]), + }, websocket=websocket) + + # configure child interface + spawned_interface.IN = self.IN + spawned_interface.OUT = self.OUT + spawned_interface.HW_MTU = self.HW_MTU + spawned_interface.bitrate = self.bitrate + spawned_interface.mode = self.mode + spawned_interface.parent_interface = self + spawned_interface.online = True + + # todo ifac? + # todo announce rates? + + # activate child interface + RNS.log(f"Spawned new WebsocketClientInterface: {spawned_interface}", RNS.LOG_VERBOSE) + RNS.Transport.interfaces.append(spawned_interface) + + # associate child interface with this interface + while spawned_interface in self.spawned_interfaces: + self.spawned_interfaces.remove(spawned_interface) + self.spawned_interfaces.append(spawned_interface) + + # run read loop + await spawned_interface.read_loop() + + # run websocket server + try: + async with websockets.serve(on_websocket_client_connected, self.listen_ip, self.listen_port, compression=None) as server: + self.online = True + await server.serve_forever() + except Exception as e: + RNS.log(f"{self} failed with error: {e}", RNS.LOG_ERROR) + + # websocket server is no longer running, let's restart it + # todo implement retry delay + self.online = False + await self.serve() + + +# set interface class RNS should use when importing this external interface +interface_class = WebsocketServerInterface