diff --git a/sbapp/main.py b/sbapp/main.py index 85f4a7f..5b06e44 100644 --- a/sbapp/main.py +++ b/sbapp/main.py @@ -5464,7 +5464,7 @@ class SidebandApp(MDApp): self.telemetry_info_dialog.dismiss() ok_button.bind(on_release=dl_ok) - result = self.sideband.request_latest_telemetry(from_addr=self.sideband.config["telemetry_collector"]) + result = self.sideband.request_latest_telemetry(from_addr=self.sideband.config["telemetry_collector"], is_collector_request=True) if result == "no_address": title_str = "Invalid Address" diff --git a/sbapp/sideband/core.py b/sbapp/sideband/core.py index 7a17b8a..6a288a9 100644 --- a/sbapp/sideband/core.py +++ b/sbapp/sideband/core.py @@ -177,6 +177,7 @@ class SidebandCore(): self.latest_telemetry = None self.latest_packed_telemetry = None self.telemetry_changes = 0 + self.telemetry_response_excluded = [] self.pending_telemetry_send = False self.pending_telemetry_send_try = 0 self.pending_telemetry_send_maxtries = 2 @@ -253,13 +254,15 @@ class SidebandCore(): if not os.path.isdir(self.app_dir+"/app_storage"): os.makedirs(self.app_dir+"/app_storage") - self.config_path = self.app_dir+"/app_storage/sideband_config" - self.identity_path = self.app_dir+"/app_storage/primary_identity" - self.db_path = self.app_dir+"/app_storage/sideband.db" - self.lxmf_storage = self.app_dir+"/app_storage/" - self.log_dir = self.app_dir+"/app_storage/" - self.tmp_dir = self.app_dir+"/app_storage/tmp" - self.exports_dir = self.app_dir+"/exports" + self.config_path = self.app_dir+"/app_storage/sideband_config" + self.identity_path = self.app_dir+"/app_storage/primary_identity" + self.db_path = self.app_dir+"/app_storage/sideband.db" + self.lxmf_storage = self.app_dir+"/app_storage/" + self.log_dir = self.app_dir+"/app_storage/" + self.tmp_dir = self.app_dir+"/app_storage/tmp" + self.exports_dir = self.app_dir+"/exports" + self.telemetry_exclude_path = self.app_dir+"/app_storage/collector_response_excluded" + if RNS.vendor.platformutils.is_android(): self.webshare_dir = "./share/" else: @@ -572,6 +575,31 @@ class SidebandCore(): self.save_configuration() + def __load_telemetry_collector_excluded(self): + if not os.path.isfile(self.telemetry_exclude_path): + try: + file = open(self.telemetry_exclude_path, "wb") + file.write("# To exclude destinations from telemetry\n# collector responses, add them to this\n# file with one destination hash per line\n".encode("utf-8")) + file.close() + except Exception as e: + RNS.log(f"Could not create telemetry collector exclude file at {self.telemetry_exclude_path}", RNS.LOG_ERROR) + + try: + with open(self.telemetry_exclude_path, "rb") as file: + data = file.read().decode("utf-8") + for line in data.splitlines(): + if not line.startswith("#"): + if len(line) >= RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2: + try: + destination_hash = bytes.fromhex(line[:RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2]) + self.telemetry_response_excluded.append(destination_hash) + except Exception as e: + RNS.log(f"Invalid destination hash {line} in telemetry response exclude file: {e}", RNS.LOG_ERROR) + + except Exception as e: + RNS.log(f"Error while loading telemetry collector response excludes: {e}", RNS.LOG_ERROR) + + def __load_config(self): RNS.log("Loading Sideband identity...", RNS.LOG_DEBUG) self.identity = RNS.Identity.from_file(self.identity_path) @@ -870,6 +898,8 @@ class SidebandCore(): self._db_upgradetables() self.__db_indices() + self.__load_telemetry_collector_excluded() + def __reload_config(self): RNS.log("Reloading Sideband configuration... ", RNS.LOG_DEBUG) with open(self.config_path, "rb") as config_file: @@ -1371,13 +1401,13 @@ class SidebandCore(): else: self.setstate(f"telemetry.{RNS.hexrep(message.destination_hash, delimit=False)}.request_sending", False) - def _service_request_latest_telemetry(self, from_addr=None): + def _service_request_latest_telemetry(self, from_addr=None, is_collector_request=False): if not RNS.vendor.platformutils.is_android(): return False else: if self.is_client: try: - return self.service_rpc_request({"request_latest_telemetry": {"from_addr": from_addr}}) + return self.service_rpc_request({"request_latest_telemetry": {"from_addr": from_addr, "is_collector_request": is_collector_request}}) except Exception as e: RNS.log("Error while requesting latest telemetry over RPC: "+str(e), RNS.LOG_DEBUG) @@ -1386,10 +1416,10 @@ class SidebandCore(): else: return False - def request_latest_telemetry(self, from_addr=None, is_livetrack=False): + def request_latest_telemetry(self, from_addr=None, is_livetrack=False, is_collector_request=False): if self.allow_service_dispatch and self.is_client: try: - return self._service_request_latest_telemetry(from_addr) + return self._service_request_latest_telemetry(from_addr, is_collector_request=is_collector_request) except Exception as e: RNS.log("Error requesting latest telemetry: "+str(e), RNS.LOG_ERROR) @@ -1428,7 +1458,7 @@ class SidebandCore(): request_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.timebase") or now - self.telemetry_request_max_history lxm_fields = { LXMF.FIELD_COMMANDS: [ - {Commands.TELEMETRY_REQUEST: request_timebase}, + {Commands.TELEMETRY_REQUEST: [request_timebase, is_collector_request]}, ]} lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields, include_ticket=True) @@ -1524,7 +1554,7 @@ class SidebandCore(): else: return False - def send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False): + def send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False, is_collector_response=False): if self.allow_service_dispatch and self.is_client: try: return self._service_send_latest_telemetry(to_addr, stream, is_authorized_telemetry_request) @@ -1566,7 +1596,7 @@ class SidebandCore(): else: desired_method = LXMF.LXMessage.DIRECT - lxm_fields = self.get_message_fields(to_addr, is_authorized_telemetry_request=is_authorized_telemetry_request, signal_already_sent=True) + lxm_fields = self.get_message_fields(to_addr, is_authorized_telemetry_request=is_authorized_telemetry_request, signal_already_sent=True, is_collector_response=is_collector_response) if lxm_fields == False and stream == None: return "already_sent" @@ -2065,7 +2095,7 @@ class SidebandCore(): connection.send(send_result) elif "request_latest_telemetry" in call: args = call["request_latest_telemetry"] - send_result = self.request_latest_telemetry(args["from_addr"]) + send_result = self.request_latest_telemetry(args["from_addr"], is_collector_request=args["is_collector_request"]) connection.send(send_result) elif "send_latest_telemetry" in call: args = call["send_latest_telemetry"] @@ -3146,6 +3176,7 @@ class SidebandCore(): tpacked = telemetry_entry[2] appearance = telemetry_entry[3] max_timebase = max(max_timebase, ttstamp) + if self._db_save_telemetry(tsource, tpacked, via = context_dest): RNS.log("Saved telemetry stream entry from "+RNS.prettyhexrep(tsource), RNS.LOG_DEBUG) if appearance != None: @@ -3804,7 +3835,7 @@ class SidebandCore(): if now > last_request_timebase+request_interval: try: RNS.log("Initiating telemetry request to collector", RNS.LOG_DEBUG) - self.request_latest_telemetry(from_addr=self.config["telemetry_collector"]) + self.request_latest_telemetry(from_addr=self.config["telemetry_collector"], is_collector_request=True) except Exception as e: RNS.log("An error occurred while requesting a telemetry update from collector. The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -4428,7 +4459,7 @@ class SidebandCore(): except Exception as e: RNS.log("Error while setting last successul telemetry timebase for "+RNS.prettyhexrep(message.destination_hash), RNS.LOG_DEBUG) - def get_message_fields(self, context_dest, telemetry_update=False, is_authorized_telemetry_request=False, signal_already_sent=False): + def get_message_fields(self, context_dest, telemetry_update=False, is_authorized_telemetry_request=False, signal_already_sent=False, is_collector_response=False): fields = {} send_telemetry = (telemetry_update == True) or (self.should_send_telemetry(context_dest) or is_authorized_telemetry_request) send_appearance = self.config["telemetry_send_appearance"] or send_telemetry @@ -4437,7 +4468,10 @@ class SidebandCore(): telemeter = Telemeter.from_packed(self.latest_packed_telemetry) telemetry_timebase = telemeter.read_all()["time"]["utc"] last_success_tb = (self.getpersistent(f"telemetry.{RNS.hexrep(context_dest, delimit=False)}.last_send_success_timebase") or 0) - if telemetry_timebase > last_success_tb: + if is_collector_response and self.lxmf_destination.hash in self.telemetry_response_excluded: + RNS.log("Not embedding own telemetry collector response since own destination hash is excluded", RNS.LOG_DEBUG) + send_telemetry = False + elif telemetry_timebase > last_success_tb: RNS.log("Embedding own telemetry in message since current telemetry is newer than latest successful timebase", RNS.LOG_DEBUG) else: RNS.log("Not embedding own telemetry in message since current telemetry timebase ("+str(telemetry_timebase)+") is not newer than latest successful timebase ("+str(last_success_tb)+")", RNS.LOG_DEBUG) @@ -5200,11 +5234,19 @@ class SidebandCore(): RNS.log("Handling commands from "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG) for command in commands: if Commands.TELEMETRY_REQUEST in command: - timebase = int(command[Commands.TELEMETRY_REQUEST]) + if type(command[Commands.TELEMETRY_REQUEST]) == list: + command_timebase = command[Commands.TELEMETRY_REQUEST][0] + enable_collector_request = command[Commands.TELEMETRY_REQUEST][1] + else: + # Handle old request format + command_timebase = command[Commands.TELEMETRY_REQUEST] + enable_collector_request = True + + timebase = int(command_timebase) RNS.log("Handling telemetry request with timebase "+str(timebase), RNS.LOG_DEBUG) - if self.config["telemetry_collector_enabled"]: + if self.config["telemetry_collector_enabled"] and enable_collector_request: RNS.log(f"Collector requests enabled, returning complete telemetry response for all known objects since {timebase}", RNS.LOG_DEBUG) - self.create_telemetry_collector_response(to_addr=context_dest, timebase=timebase, is_authorized_telemetry_request=True) + self.create_telemetry_collector_response(to_addr=context_dest, timebase=timebase, is_authorized_telemetry_request=True, is_collector_response=True) else: RNS.log("Responding with own latest telemetry", RNS.LOG_DEBUG) self.send_latest_telemetry(to_addr=context_dest) @@ -5240,7 +5282,7 @@ class SidebandCore(): except Exception as e: RNS.log("Error while handling commands: "+str(e), RNS.LOG_ERROR) - def create_telemetry_collector_response(self, to_addr, timebase, is_authorized_telemetry_request=False): + def create_telemetry_collector_response(self, to_addr, timebase, is_authorized_telemetry_request=False, is_collector_response=False): if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True: RNS.log("Not sending new telemetry collector response, since an earlier transfer is already in progress", RNS.LOG_DEBUG) return "in_progress" @@ -5252,20 +5294,23 @@ class SidebandCore(): elements = 0; added = 0 telemetry_stream = [] for source in sources: - if source != to_addr: - for entry in sources[source]: - elements += 1 - timestamp = entry[0]; packed_telemetry = entry[1] - appearance = self._db_get_appearance(source, raw=True) - te = [source, timestamp, packed_telemetry, appearance] - if only_latest: - if not source in added_sources: - added_sources[source] = True + if source in self.telemetry_response_excluded: + RNS.log(f"Excluding {RNS.prettyhexrep(source)} from collector response", RNS.LOG_DEBUG) + else: + if source != to_addr: + for entry in sources[source]: + elements += 1 + timestamp = entry[0]; packed_telemetry = entry[1] + appearance = self._db_get_appearance(source, raw=True) + te = [source, timestamp, packed_telemetry, appearance] + if only_latest: + if not source in added_sources: + added_sources[source] = True + telemetry_stream.append(te) + added += 1 + else: telemetry_stream.append(te) added += 1 - else: - telemetry_stream.append(te) - added += 1 if len(telemetry_stream) == 0: RNS.log(f"No new telemetry for request with timebase {timebase}", RNS.LOG_DEBUG) @@ -5273,7 +5318,8 @@ class SidebandCore(): return self.send_latest_telemetry( to_addr=to_addr, stream=telemetry_stream, - is_authorized_telemetry_request=is_authorized_telemetry_request + is_authorized_telemetry_request=is_authorized_telemetry_request, + is_collector_response=is_collector_response, )