diff --git a/RNS/Interfaces/BackboneInterface.py b/RNS/Interfaces/BackboneInterface.py index f9b2517..5cdd73e 100644 --- a/RNS/Interfaces/BackboneInterface.py +++ b/RNS/Interfaces/BackboneInterface.py @@ -146,6 +146,24 @@ class BackboneInterface(Interface): def ensure_epoll(): if not BackboneInterface.epoll: BackboneInterface.epoll = select.epoll() + @staticmethod + def add_listener(interface, bind_address, socket_type=socket.AF_INET): + BackboneInterface.ensure_epoll() + if socket_type == socket.AF_INET: + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(bind_address) + elif socket_type == socket.AF_UNIX: + server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server_socket.bind(bind_address) + else: raise TypeError(f"Invalid socket type {socket_type} for {interface}") + + server_socket.listen(1) + server_socket.setblocking(0) + BackboneInterface.listener_filenos[server_socket.fileno()] = (interface, server_socket) + BackboneInterface.epoll.register(server_socket.fileno(), select.EPOLLIN) + BackboneInterface.start() + @staticmethod def add_client_socket(client_socket, interface): BackboneInterface.ensure_epoll() @@ -156,14 +174,28 @@ class BackboneInterface(Interface): @staticmethod def register_in(fileno): # TODO: Remove debug - RNS.log(f"Registering EPOLL_IN for {fileno}", RNS.LOG_DEBUG) - BackboneInterface.epoll.register(fileno, select.EPOLLIN) + # RNS.log(f"Registering EPOLL_IN for {fileno}", RNS.LOG_DEBUG) + try: BackboneInterface.epoll.register(fileno, select.EPOLLIN) + except Exception as e: + RNS.log(f"An error occurred while registering EPOLL_IN for file descriptor {fileno}: {e}", RNS.LOG_ERROR) @staticmethod def deregister_fileno(fileno): # TODO: Remove debug - RNS.log(f"Deregistering {fileno}", RNS.LOG_DEBUG) - BackboneInterface.epoll.unregister(fileno) + # RNS.log(f"Deregistering {fileno}", RNS.LOG_DEBUG) + try: BackboneInterface.epoll.unregister(fileno) + except Exception as e: + RNS.log(f"An error occurred while deregistering file descriptor {fileno}: {e}", RNS.LOG_DEBUG) + + @staticmethod + def deregister_listeners(): + for fileno in BackboneInterface.listener_filenos: + owner_interface, server_socket = BackboneInterface.listener_filenos[fileno] + fileno = server_socket.fileno() + BackboneInterface.deregister_fileno(fileno) + server_socket.close() + + BackboneInterface.listener_filenos.clear() @staticmethod def tx_ready(interface): @@ -205,8 +237,8 @@ class BackboneInterface(Interface): try: written = client_socket.send(spawned_interface.transmit_buffer) except Exception as e: - RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_ERROR) written = 0 + if not spawned_interface.detached: RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_ERROR) BackboneInterface.deregister_fileno(fileno) try: client_socket.close() except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR) @@ -243,28 +275,8 @@ class BackboneInterface(Interface): RNS.trace_exception(e) finally: - for owner_interface, serversocket in BackboneInterface.listener_filenos: - fileno = serversocket.fileno() - BackboneInterface.deregister_fileno(fileno) - serversocket.close() - - BackboneInterface.listener_filenos.clear() + BackboneInterface.deregister_listeners() - @staticmethod - def add_listener(interface, bind_address, socket_type=socket.AF_INET): - BackboneInterface.ensure_epoll() - if socket_type == socket.AF_INET: - server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(bind_address) - else: raise TypeError(f"Invalid socket type {socket_type} for {interface}") - - server_socket.listen(1) - server_socket.setblocking(0) - BackboneInterface.listener_filenos[server_socket.fileno()] = (interface, server_socket) - BackboneInterface.epoll.register(server_socket.fileno(), select.EPOLLIN) - BackboneInterface.start() - def incoming_connection(self, socket): RNS.log("Accepting incoming connection", RNS.LOG_VERBOSE) spawned_configuration = {"name": "Client on "+self.name, "target_host": None, "target_port": None} @@ -331,7 +343,7 @@ class BackboneInterface(Interface): if hasattr(listener_socket, "shutdown"): if callable(listener_socket.shutdown): try: listener_socket.shutdown(socket.SHUT_RDWR) - except Exception as e: RNS.log("Error while shutting down server for "+str(self)+": "+str(e)) + except Exception as e: RNS.log("Error while shutting down socket for "+str(self)+": "+str(e)) def __str__(self): if ":" in self.bind_ip: diff --git a/RNS/Interfaces/LocalInterface.py b/RNS/Interfaces/LocalInterface.py index ccaefb9..b5f08e2 100644 --- a/RNS/Interfaces/LocalInterface.py +++ b/RNS/Interfaces/LocalInterface.py @@ -55,13 +55,16 @@ class LocalClientInterface(Interface): RECONNECT_WAIT = 8 AUTOCONFIGURE_MTU = True - def __init__(self, owner, name, target_port = None, connected_socket=None): + def __init__(self, owner, name, target_port = None, connected_socket=None, socket_path=None): super().__init__() self.epoll_backend = False self.HW_MTU = 262144 self.online = False + if RNS.vendor.platformutils.is_linux(): self.socket_path = f"\0rns/{socket_path}" + else: self.socket_path = None + self.IN = True self.OUT = False self.socket = None @@ -82,10 +85,18 @@ class LocalClientInterface(Interface): self.target_ip = None self.target_port = None self.socket = connected_socket - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + + if self.socket.family == socket.AF_INET: + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.is_connected_to_shared_instance = False + elif self.socket_path != None: + self.receives = True + self.target_ip = None + self.target_port = None + self.connect() + elif target_port != None: self.receives = True self.target_ip = "127.0.0.1" @@ -113,9 +124,14 @@ class LocalClientInterface(Interface): return False def connect(self): - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self.socket.connect((self.target_ip, self.target_port)) + if self.socket_path != None: + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.socket.connect(self.socket_path) + + else: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.socket.connect((self.target_ip, self.target_port)) self.online = True self.is_connected_to_shared_instance = True @@ -319,12 +335,15 @@ class LocalClientInterface(Interface): class LocalServerInterface(Interface): AUTOCONFIGURE_MTU = True - def __init__(self, owner, bindport=None): + def __init__(self, owner, bindport=None, socket_path=None): super().__init__() self.epoll_backend = False self.online = False self.clients = 0 + if RNS.vendor.platformutils.is_linux(): self.socket_path = f"\0rns/{socket_path}" + else: self.socket_path = None + self.IN = True self.OUT = False self.name = "Reticulum" @@ -333,53 +352,73 @@ class LocalServerInterface(Interface): if RNS.vendor.platformutils.is_linux(): self.epoll_backend = True - if (bindport != None): + if socket_path != None and self.epoll_backend: + self.receives = True + self.bind_ip = None + self.bind_port = None + + self.owner = owner + self.is_local_shared_instance = True + BackboneInterface.add_listener(self, self.socket_path, socket_type=socket.AF_UNIX) + + elif bindport != None: self.receives = True self.bind_ip = "127.0.0.1" self.bind_port = bindport - def handlerFactory(callback): - def createHandler(*args, **keys): - return LocalInterfaceHandler(callback, *args, **keys) - return createHandler - self.owner = owner self.is_local_shared_instance = True address = (self.bind_ip, self.bind_port) - if self.epoll_backend: BackboneInterface.add_listener(self, address) else: + def handlerFactory(callback): + def createHandler(*args, **keys): + return LocalInterfaceHandler(callback, *args, **keys) + return createHandler + self.server = ThreadingTCPServer(address, handlerFactory(self.incoming_connection)) self.server.daemon_threads = True thread = threading.Thread(target=self.server.serve_forever) thread.daemon = True thread.start() - self.announce_rate_target = None - self.announce_rate_grace = None - self.announce_rate_penalty = None + self.announce_rate_target = None + self.announce_rate_grace = None + self.announce_rate_penalty = None - self.bitrate = 1000*1000*1000 - self.online = True + self.bitrate = 1000*1000*1000 + self.online = True def incoming_connection(self, handler): if self.epoll_backend: - socket = handler - interface_name = str(str(socket.getpeername()[1])) - spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=socket) + client_socket = handler + if client_socket.family == socket.AF_INET: + interface_name = str(str(client_socket.getpeername()[1])) + elif client_socket.family == socket.AF_UNIX: + interface_name = f"{self.clients}@{self.socket_path}" + + spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=client_socket) spawned_interface.OUT = self.OUT spawned_interface.IN = self.IN - spawned_interface.socket = socket - spawned_interface.target_ip = socket.getpeername()[0] - spawned_interface.target_port = str(socket.getpeername()[1]) + spawned_interface.socket = client_socket spawned_interface.parent_interface = self spawned_interface.bitrate = self.bitrate + + if client_socket.family == socket.AF_INET: + spawned_interface.target_ip = client_socket.getpeername()[0] + spawned_interface.target_port = str(client_socket.getpeername()[1]) + + elif client_socket.family == socket.AF_UNIX: + spawned_interface.target_ip = None + spawned_interface.target_port = interface_name + spawned_interface.socket_path = self.socket_path + if hasattr(self, "_force_bitrate"): spawned_interface._force_bitrate = self._force_bitrate RNS.Transport.interfaces.append(spawned_interface) RNS.Transport.local_client_interfaces.append(spawned_interface) + BackboneInterface.add_client_socket(client_socket, spawned_interface) self.clients += 1 - BackboneInterface.add_client_socket(socket, spawned_interface) return True else: @@ -407,7 +446,8 @@ class LocalServerInterface(Interface): if from_spawned: self.oa_freq_deque.append(time.time()) def __str__(self): - return "Shared Instance["+str(self.bind_port)+"]" + if self.socket_path: return "Shared Instance["+str(self.socket_path)+"]" + else: return "Shared Instance["+str(self.bind_port)+"]" class LocalInterfaceHandler(socketserver.BaseRequestHandler): def __init__(self, callback, *args, **keys): diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index e4b48bd..18e08ef 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -251,6 +251,7 @@ class Reticulum: self.local_interface_port = 37428 self.local_control_port = 37429 + self.local_socket_path = None self.share_instance = True self.rpc_listener = None self.rpc_key = None @@ -351,7 +352,8 @@ class Reticulum: try: interface = LocalInterface.LocalServerInterface( RNS.Transport, - self.local_interface_port + self.local_interface_port, + socket_path=self.local_socket_path ) interface.OUT = True if hasattr(Reticulum, "_force_shared_instance_bitrate"): @@ -377,7 +379,8 @@ class Reticulum: interface = LocalInterface.LocalClientInterface( RNS.Transport, "Local shared instance", - self.local_interface_port) + self.local_interface_port, + socket_path=self.local_socket_path) interface.target_port = self.local_interface_port interface.OUT = True if hasattr(Reticulum, "_force_shared_instance_bitrate"): @@ -428,6 +431,9 @@ class Reticulum: if option == "share_instance": value = self.config["reticulum"].as_bool(option) self.share_instance = value + if option == "instance_name": + value = self.config["reticulum"][option] + self.local_socket_path = value if option == "shared_instance_port": value = int(self.config["reticulum"][option]) self.local_interface_port = value diff --git a/RNS/Transport.py b/RNS/Transport.py index 277405a..56e03ea 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -29,6 +29,7 @@ import inspect import threading from time import sleep from .vendor import umsgpack as umsgpack +from RNS.Interfaces.BackboneInterface import BackboneInterface class Transport: """ @@ -2706,8 +2707,8 @@ class Transport: li.detach() RNS.log("Detaching shared instance", RNS.LOG_DEBUG) - if shared_instance_master != None: - shared_instance_master.detach() + if shared_instance_master != None: shared_instance_master.detach() + BackboneInterface.deregister_listeners() RNS.log("All interfaces detached", RNS.LOG_DEBUG)