Added ability to exclude objects from telemetry collector responses

This commit is contained in:
Mark Qvist 2025-05-12 00:08:57 +02:00
commit 2638688bbc
2 changed files with 82 additions and 36 deletions

View file

@ -5464,7 +5464,7 @@ class SidebandApp(MDApp):
self.telemetry_info_dialog.dismiss()
ok_button.bind(on_release=dl_ok)
result = self.sideband.request_latest_telemetry(from_addr=self.sideband.config["telemetry_collector"])
result = self.sideband.request_latest_telemetry(from_addr=self.sideband.config["telemetry_collector"], is_collector_request=True)
if result == "no_address":
title_str = "Invalid Address"

View file

@ -177,6 +177,7 @@ class SidebandCore():
self.latest_telemetry = None
self.latest_packed_telemetry = None
self.telemetry_changes = 0
self.telemetry_response_excluded = []
self.pending_telemetry_send = False
self.pending_telemetry_send_try = 0
self.pending_telemetry_send_maxtries = 2
@ -253,13 +254,15 @@ class SidebandCore():
if not os.path.isdir(self.app_dir+"/app_storage"):
os.makedirs(self.app_dir+"/app_storage")
self.config_path = self.app_dir+"/app_storage/sideband_config"
self.identity_path = self.app_dir+"/app_storage/primary_identity"
self.db_path = self.app_dir+"/app_storage/sideband.db"
self.lxmf_storage = self.app_dir+"/app_storage/"
self.log_dir = self.app_dir+"/app_storage/"
self.tmp_dir = self.app_dir+"/app_storage/tmp"
self.exports_dir = self.app_dir+"/exports"
self.config_path = self.app_dir+"/app_storage/sideband_config"
self.identity_path = self.app_dir+"/app_storage/primary_identity"
self.db_path = self.app_dir+"/app_storage/sideband.db"
self.lxmf_storage = self.app_dir+"/app_storage/"
self.log_dir = self.app_dir+"/app_storage/"
self.tmp_dir = self.app_dir+"/app_storage/tmp"
self.exports_dir = self.app_dir+"/exports"
self.telemetry_exclude_path = self.app_dir+"/app_storage/collector_response_excluded"
if RNS.vendor.platformutils.is_android():
self.webshare_dir = "./share/"
else:
@ -572,6 +575,31 @@ class SidebandCore():
self.save_configuration()
def __load_telemetry_collector_excluded(self):
if not os.path.isfile(self.telemetry_exclude_path):
try:
file = open(self.telemetry_exclude_path, "wb")
file.write("# To exclude destinations from telemetry\n# collector responses, add them to this\n# file with one destination hash per line\n".encode("utf-8"))
file.close()
except Exception as e:
RNS.log(f"Could not create telemetry collector exclude file at {self.telemetry_exclude_path}", RNS.LOG_ERROR)
try:
with open(self.telemetry_exclude_path, "rb") as file:
data = file.read().decode("utf-8")
for line in data.splitlines():
if not line.startswith("#"):
if len(line) >= RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2:
try:
destination_hash = bytes.fromhex(line[:RNS.Reticulum.TRUNCATED_HASHLENGTH//8*2])
self.telemetry_response_excluded.append(destination_hash)
except Exception as e:
RNS.log(f"Invalid destination hash {line} in telemetry response exclude file: {e}", RNS.LOG_ERROR)
except Exception as e:
RNS.log(f"Error while loading telemetry collector response excludes: {e}", RNS.LOG_ERROR)
def __load_config(self):
RNS.log("Loading Sideband identity...", RNS.LOG_DEBUG)
self.identity = RNS.Identity.from_file(self.identity_path)
@ -870,6 +898,8 @@ class SidebandCore():
self._db_upgradetables()
self.__db_indices()
self.__load_telemetry_collector_excluded()
def __reload_config(self):
RNS.log("Reloading Sideband configuration... ", RNS.LOG_DEBUG)
with open(self.config_path, "rb") as config_file:
@ -1371,13 +1401,13 @@ class SidebandCore():
else:
self.setstate(f"telemetry.{RNS.hexrep(message.destination_hash, delimit=False)}.request_sending", False)
def _service_request_latest_telemetry(self, from_addr=None):
def _service_request_latest_telemetry(self, from_addr=None, is_collector_request=False):
if not RNS.vendor.platformutils.is_android():
return False
else:
if self.is_client:
try:
return self.service_rpc_request({"request_latest_telemetry": {"from_addr": from_addr}})
return self.service_rpc_request({"request_latest_telemetry": {"from_addr": from_addr, "is_collector_request": is_collector_request}})
except Exception as e:
RNS.log("Error while requesting latest telemetry over RPC: "+str(e), RNS.LOG_DEBUG)
@ -1386,10 +1416,10 @@ class SidebandCore():
else:
return False
def request_latest_telemetry(self, from_addr=None, is_livetrack=False):
def request_latest_telemetry(self, from_addr=None, is_livetrack=False, is_collector_request=False):
if self.allow_service_dispatch and self.is_client:
try:
return self._service_request_latest_telemetry(from_addr)
return self._service_request_latest_telemetry(from_addr, is_collector_request=is_collector_request)
except Exception as e:
RNS.log("Error requesting latest telemetry: "+str(e), RNS.LOG_ERROR)
@ -1428,7 +1458,7 @@ class SidebandCore():
request_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.timebase") or now - self.telemetry_request_max_history
lxm_fields = { LXMF.FIELD_COMMANDS: [
{Commands.TELEMETRY_REQUEST: request_timebase},
{Commands.TELEMETRY_REQUEST: [request_timebase, is_collector_request]},
]}
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields, include_ticket=True)
@ -1524,7 +1554,7 @@ class SidebandCore():
else:
return False
def send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False):
def send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False, is_collector_response=False):
if self.allow_service_dispatch and self.is_client:
try:
return self._service_send_latest_telemetry(to_addr, stream, is_authorized_telemetry_request)
@ -1566,7 +1596,7 @@ class SidebandCore():
else:
desired_method = LXMF.LXMessage.DIRECT
lxm_fields = self.get_message_fields(to_addr, is_authorized_telemetry_request=is_authorized_telemetry_request, signal_already_sent=True)
lxm_fields = self.get_message_fields(to_addr, is_authorized_telemetry_request=is_authorized_telemetry_request, signal_already_sent=True, is_collector_response=is_collector_response)
if lxm_fields == False and stream == None:
return "already_sent"
@ -2065,7 +2095,7 @@ class SidebandCore():
connection.send(send_result)
elif "request_latest_telemetry" in call:
args = call["request_latest_telemetry"]
send_result = self.request_latest_telemetry(args["from_addr"])
send_result = self.request_latest_telemetry(args["from_addr"], is_collector_request=args["is_collector_request"])
connection.send(send_result)
elif "send_latest_telemetry" in call:
args = call["send_latest_telemetry"]
@ -3146,6 +3176,7 @@ class SidebandCore():
tpacked = telemetry_entry[2]
appearance = telemetry_entry[3]
max_timebase = max(max_timebase, ttstamp)
if self._db_save_telemetry(tsource, tpacked, via = context_dest):
RNS.log("Saved telemetry stream entry from "+RNS.prettyhexrep(tsource), RNS.LOG_DEBUG)
if appearance != None:
@ -3804,7 +3835,7 @@ class SidebandCore():
if now > last_request_timebase+request_interval:
try:
RNS.log("Initiating telemetry request to collector", RNS.LOG_DEBUG)
self.request_latest_telemetry(from_addr=self.config["telemetry_collector"])
self.request_latest_telemetry(from_addr=self.config["telemetry_collector"], is_collector_request=True)
except Exception as e:
RNS.log("An error occurred while requesting a telemetry update from collector. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -4428,7 +4459,7 @@ class SidebandCore():
except Exception as e:
RNS.log("Error while setting last successul telemetry timebase for "+RNS.prettyhexrep(message.destination_hash), RNS.LOG_DEBUG)
def get_message_fields(self, context_dest, telemetry_update=False, is_authorized_telemetry_request=False, signal_already_sent=False):
def get_message_fields(self, context_dest, telemetry_update=False, is_authorized_telemetry_request=False, signal_already_sent=False, is_collector_response=False):
fields = {}
send_telemetry = (telemetry_update == True) or (self.should_send_telemetry(context_dest) or is_authorized_telemetry_request)
send_appearance = self.config["telemetry_send_appearance"] or send_telemetry
@ -4437,7 +4468,10 @@ class SidebandCore():
telemeter = Telemeter.from_packed(self.latest_packed_telemetry)
telemetry_timebase = telemeter.read_all()["time"]["utc"]
last_success_tb = (self.getpersistent(f"telemetry.{RNS.hexrep(context_dest, delimit=False)}.last_send_success_timebase") or 0)
if telemetry_timebase > last_success_tb:
if is_collector_response and self.lxmf_destination.hash in self.telemetry_response_excluded:
RNS.log("Not embedding own telemetry collector response since own destination hash is excluded", RNS.LOG_DEBUG)
send_telemetry = False
elif telemetry_timebase > last_success_tb:
RNS.log("Embedding own telemetry in message since current telemetry is newer than latest successful timebase", RNS.LOG_DEBUG)
else:
RNS.log("Not embedding own telemetry in message since current telemetry timebase ("+str(telemetry_timebase)+") is not newer than latest successful timebase ("+str(last_success_tb)+")", RNS.LOG_DEBUG)
@ -5200,11 +5234,19 @@ class SidebandCore():
RNS.log("Handling commands from "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG)
for command in commands:
if Commands.TELEMETRY_REQUEST in command:
timebase = int(command[Commands.TELEMETRY_REQUEST])
if type(command[Commands.TELEMETRY_REQUEST]) == list:
command_timebase = command[Commands.TELEMETRY_REQUEST][0]
enable_collector_request = command[Commands.TELEMETRY_REQUEST][1]
else:
# Handle old request format
command_timebase = command[Commands.TELEMETRY_REQUEST]
enable_collector_request = True
timebase = int(command_timebase)
RNS.log("Handling telemetry request with timebase "+str(timebase), RNS.LOG_DEBUG)
if self.config["telemetry_collector_enabled"]:
if self.config["telemetry_collector_enabled"] and enable_collector_request:
RNS.log(f"Collector requests enabled, returning complete telemetry response for all known objects since {timebase}", RNS.LOG_DEBUG)
self.create_telemetry_collector_response(to_addr=context_dest, timebase=timebase, is_authorized_telemetry_request=True)
self.create_telemetry_collector_response(to_addr=context_dest, timebase=timebase, is_authorized_telemetry_request=True, is_collector_response=True)
else:
RNS.log("Responding with own latest telemetry", RNS.LOG_DEBUG)
self.send_latest_telemetry(to_addr=context_dest)
@ -5240,7 +5282,7 @@ class SidebandCore():
except Exception as e:
RNS.log("Error while handling commands: "+str(e), RNS.LOG_ERROR)
def create_telemetry_collector_response(self, to_addr, timebase, is_authorized_telemetry_request=False):
def create_telemetry_collector_response(self, to_addr, timebase, is_authorized_telemetry_request=False, is_collector_response=False):
if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True:
RNS.log("Not sending new telemetry collector response, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress"
@ -5252,20 +5294,23 @@ class SidebandCore():
elements = 0; added = 0
telemetry_stream = []
for source in sources:
if source != to_addr:
for entry in sources[source]:
elements += 1
timestamp = entry[0]; packed_telemetry = entry[1]
appearance = self._db_get_appearance(source, raw=True)
te = [source, timestamp, packed_telemetry, appearance]
if only_latest:
if not source in added_sources:
added_sources[source] = True
if source in self.telemetry_response_excluded:
RNS.log(f"Excluding {RNS.prettyhexrep(source)} from collector response", RNS.LOG_DEBUG)
else:
if source != to_addr:
for entry in sources[source]:
elements += 1
timestamp = entry[0]; packed_telemetry = entry[1]
appearance = self._db_get_appearance(source, raw=True)
te = [source, timestamp, packed_telemetry, appearance]
if only_latest:
if not source in added_sources:
added_sources[source] = True
telemetry_stream.append(te)
added += 1
else:
telemetry_stream.append(te)
added += 1
else:
telemetry_stream.append(te)
added += 1
if len(telemetry_stream) == 0:
RNS.log(f"No new telemetry for request with timebase {timebase}", RNS.LOG_DEBUG)
@ -5273,7 +5318,8 @@ class SidebandCore():
return self.send_latest_telemetry(
to_addr=to_addr,
stream=telemetry_stream,
is_authorized_telemetry_request=is_authorized_telemetry_request
is_authorized_telemetry_request=is_authorized_telemetry_request,
is_collector_response=is_collector_response,
)