From a81c6787c708812f18d419c6725f2d97d04da8d0 Mon Sep 17 00:00:00 2001 From: liamcottle Date: Fri, 7 Feb 2025 17:00:37 +1300 Subject: [PATCH] refactor websocket interfaces to use threading and implement detach --- .../interfaces/WebsocketClientInterface.py | 50 +++++++++++-------- .../interfaces/WebsocketServerInterface.py | 44 +++++++++------- 2 files changed, 57 insertions(+), 37 deletions(-) diff --git a/src/backend/interfaces/WebsocketClientInterface.py b/src/backend/interfaces/WebsocketClientInterface.py index 6dfecf2..5a7e74f 100644 --- a/src/backend/interfaces/WebsocketClientInterface.py +++ b/src/backend/interfaces/WebsocketClientInterface.py @@ -1,12 +1,10 @@ -import asyncio import threading +import time import RNS -import websockets from RNS.Interfaces.Interface import Interface -from websockets.asyncio.connection import Connection - -from src.backend.async_utils import AsyncUtils +from websockets.sync.client import connect +from websockets.sync.connection import Connection class WebsocketClientInterface(Interface): @@ -14,6 +12,8 @@ class WebsocketClientInterface(Interface): # TODO: required? DEFAULT_IFAC_SIZE = 16 + RECONNECT_DELAY_MILLIS = 5000 + def __str__(self): return f"WebsocketClientInterface[{self.name}/{self.target_host}:{self.target_port}]" @@ -49,7 +49,7 @@ class WebsocketClientInterface(Interface): # connect to websocket server if an existing connection was not provided self.websocket = websocket if self.websocket is None: - thread = threading.Thread(target=asyncio.run, args=(self.connect(),)) + thread = threading.Thread(target=self.connect) thread.daemon = True thread.start() @@ -73,43 +73,53 @@ class WebsocketClientInterface(Interface): # send to websocket server print(f"{self} process_outgoing: {data.hex()}") - AsyncUtils.run_async(self.websocket.send(data)) + self.websocket.send(data) # update sent bytes counter self.txb += len(data) # connect to the configured websocket server - async def connect(self): + def connect(self): + # do nothing if interface is detached + if self.detached: + return + + # connect to websocket server try: # todo: ws:// and wss:// support in config file? - async with websockets.connect(f"ws://{self.target_host}:{self.target_port}", max_size=None, compression=None) as websocket: - self.websocket = websocket - await self.read_loop() + self.websocket = connect(f"ws://{self.target_host}:{self.target_port}", max_size=None, compression=None) + self.read_loop() except Exception as e: RNS.log(f"{self} failed with error: {e}", RNS.LOG_ERROR) - # todo implement reconnect delay - await self.connect() + # auto reconnect after delay + time.sleep(self.RECONNECT_DELAY_MILLIS) + self.connect() - async def read_loop(self): + def read_loop(self): self.online = True try: - async for message in self.websocket: + for message in self.websocket: self.process_incoming(message) except Exception as e: RNS.log(f"{self} read loop error: {e}", RNS.LOG_ERROR) self.online = False - # todo implement def detach(self): - # todo mark as offline - # todo close websocket - # todo mark as detached - pass + + # mark as offline + self.online = False + + # close websocket + if self.websocket is not None: + self.websocket.close() + + # mark as detached + self.detached = True # set interface class RNS should use when importing this external interface interface_class = WebsocketClientInterface diff --git a/src/backend/interfaces/WebsocketServerInterface.py b/src/backend/interfaces/WebsocketServerInterface.py index 8ffcdc4..52b15d7 100644 --- a/src/backend/interfaces/WebsocketServerInterface.py +++ b/src/backend/interfaces/WebsocketServerInterface.py @@ -1,11 +1,11 @@ -import asyncio import threading import time import RNS -import websockets from RNS.Interfaces.Interface import Interface -from websockets import ServerConnection +from websockets.sync.server import Server +from websockets.sync.server import serve +from websockets.sync.server import ServerConnection from src.backend.interfaces.WebsocketClientInterface import WebsocketClientInterface @@ -15,6 +15,8 @@ class WebsocketServerInterface(Interface): # TODO: required? DEFAULT_IFAC_SIZE = 16 + RESTART_DELAY_MILLIS = 5000 + def __str__(self): return f"WebsocketServerInterface[{self.name}/{self.listen_ip}:{self.listen_port}]" @@ -29,7 +31,9 @@ class WebsocketServerInterface(Interface): self.HW_MTU = 262144 # 256KiB self.bitrate = 1_000_000_000 # 1Gbps self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL - self.spawned_interfaces = [] + + self.server: Server | None = None + self.spawned_interfaces: [WebsocketClientInterface] = [] # parse config ifconf = Interface.get_config_obj(configuration) @@ -49,7 +53,7 @@ class WebsocketServerInterface(Interface): self.listen_port = int(self.listen_port) # run websocket server - thread = threading.Thread(target=asyncio.run, args=(self.serve(),)) + thread = threading.Thread(target=self.serve) thread.daemon = True thread.start() @@ -78,10 +82,10 @@ class WebsocketServerInterface(Interface): def process_outgoing(self, data): pass - async def serve(self): + def serve(self): # handle new websocket client connections - async def on_websocket_client_connected(websocket: ServerConnection): + def on_websocket_client_connected(websocket: ServerConnection): # create new child interface RNS.log("Accepting incoming WebSocket connection", RNS.LOG_VERBOSE) @@ -118,27 +122,33 @@ class WebsocketServerInterface(Interface): self.spawned_interfaces.append(spawned_interface) # run read loop - await spawned_interface.read_loop() + spawned_interface.read_loop() # run websocket server try: - async with websockets.serve(on_websocket_client_connected, self.listen_ip, self.listen_port, compression=None) as server: + with serve(on_websocket_client_connected, self.listen_ip, self.listen_port, compression=None) as server: self.online = True - await server.serve_forever() + self.server = server + server.serve_forever() except Exception as e: RNS.log(f"{self} failed with error: {e}", RNS.LOG_ERROR) # websocket server is no longer running, let's restart it - # todo implement retry delay self.online = False - await self.serve() + time.sleep(self.RESTART_DELAY_MILLIS) + self.serve() - # todo implement def detach(self): - # todo mark as offline - # todo stop websocket server and all existing connections - # todo mark as detached - pass + + # mark as offline + self.online = False + + # stop websocket server + if self.server is not None: + self.server.shutdown() + + # mark as detached + self.detached = True # set interface class RNS should use when importing this external interface interface_class = WebsocketServerInterface