mirror of
https://github.com/markqvist/NomadNet.git
synced 2026-04-27 14:10:31 +00:00
906 lines
37 KiB
Python
906 lines
37 KiB
Python
import os
|
|
import RNS
|
|
import LXMF
|
|
import shutil
|
|
import RNS.vendor.umsgpack as msgpack
|
|
import nomadnet
|
|
from nomadnet.Directory import DirectoryEntry
|
|
|
|
class Conversation:
|
|
cached_conversations = {}
|
|
unread_conversations = {}
|
|
created_callback = None
|
|
|
|
aspect_filter = "lxmf.delivery"
|
|
@staticmethod
|
|
def received_announce(destination_hash, announced_identity, app_data):
|
|
app = nomadnet.NomadNetworkApp.get_shared_instance()
|
|
|
|
if not destination_hash in app.ignored_list:
|
|
destination_hash_text = RNS.hexrep(destination_hash, delimit=False)
|
|
# Check if the announced destination is in
|
|
# our list of conversations
|
|
if destination_hash_text in [e[0] for e in Conversation.conversation_list(app)]:
|
|
if app.directory.find(destination_hash):
|
|
if Conversation.created_callback != None:
|
|
Conversation.created_callback()
|
|
else:
|
|
if Conversation.created_callback != None:
|
|
Conversation.created_callback()
|
|
|
|
# This reformats the new v0.5.0 announce data back to the expected format
|
|
# for nomadnets storage and other handling functions.
|
|
dn = LXMF.display_name_from_app_data(app_data)
|
|
app_data = b""
|
|
if dn != None:
|
|
app_data = dn.encode("utf-8")
|
|
|
|
# Add the announce to the directory announce
|
|
# stream logger
|
|
app.directory.lxmf_announce_received(destination_hash, app_data)
|
|
|
|
else:
|
|
RNS.log("Ignored announce from "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
|
|
|
|
@staticmethod
|
|
def query_for_peer(source_hash):
|
|
try:
|
|
RNS.Transport.request_path(bytes.fromhex(source_hash))
|
|
except Exception as e:
|
|
RNS.log("Error while querying network for peer identity. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
|
|
|
@staticmethod
|
|
def ingest(lxmessage, app, originator = False, delegate = None):
|
|
if originator:
|
|
source_hash = lxmessage.destination_hash
|
|
else:
|
|
source_hash = lxmessage.source_hash
|
|
|
|
source_hash_path = RNS.hexrep(source_hash, delimit=False)
|
|
|
|
conversation_path = app.conversationpath + "/" + source_hash_path
|
|
|
|
if not os.path.isdir(conversation_path):
|
|
os.makedirs(conversation_path)
|
|
if Conversation.created_callback != None:
|
|
Conversation.created_callback()
|
|
|
|
ingested_path = lxmessage.write_to_directory(conversation_path)
|
|
|
|
try:
|
|
ConversationMessage.extract_attachments_from_lxm(lxmessage, app)
|
|
except Exception as e:
|
|
RNS.log("Error extracting attachments: "+str(e), RNS.LOG_ERROR)
|
|
|
|
if RNS.hexrep(source_hash, delimit=False) in Conversation.cached_conversations:
|
|
conversation = Conversation.cached_conversations[RNS.hexrep(source_hash, delimit=False)]
|
|
conversation.scan_storage()
|
|
|
|
if source_hash in Conversation.unread_conversations:
|
|
Conversation.unread_conversations[source_hash] += 1
|
|
else:
|
|
Conversation.unread_conversations[source_hash] = 1
|
|
|
|
try:
|
|
dirname = RNS.hexrep(source_hash, delimit=False)
|
|
with open(app.conversationpath + "/" + dirname + "/unread", "w") as uf:
|
|
uf.write(str(Conversation.unread_conversations[source_hash]))
|
|
except Exception as e:
|
|
pass
|
|
|
|
if Conversation.created_callback != None:
|
|
Conversation.created_callback()
|
|
|
|
return ingested_path
|
|
|
|
@staticmethod
|
|
def conversation_list(app):
|
|
conversations = []
|
|
for dirname in os.listdir(app.conversationpath):
|
|
if len(dirname) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2 and os.path.isdir(app.conversationpath + "/" + dirname):
|
|
try:
|
|
source_hash_text = dirname
|
|
source_hash = bytes.fromhex(dirname)
|
|
app_data = RNS.Identity.recall_app_data(source_hash)
|
|
display_name = app.directory.display_name(source_hash)
|
|
|
|
unread = 0
|
|
if source_hash in Conversation.unread_conversations:
|
|
unread = Conversation.unread_conversations[source_hash]
|
|
elif os.path.isfile(app.conversationpath + "/" + dirname + "/unread"):
|
|
try:
|
|
with open(app.conversationpath + "/" + dirname + "/unread", "r") as uf:
|
|
content = uf.read().strip()
|
|
unread = int(content) if content else 1
|
|
except Exception:
|
|
unread = 1
|
|
Conversation.unread_conversations[source_hash] = unread
|
|
|
|
if display_name == None and app_data:
|
|
display_name = LXMF.display_name_from_app_data(app_data)
|
|
|
|
if display_name == None:
|
|
sort_name = ""
|
|
else:
|
|
sort_name = display_name
|
|
|
|
trust_level = app.directory.trust_level(source_hash, display_name)
|
|
|
|
conversation_dir = app.conversationpath + "/" + dirname
|
|
try:
|
|
last_activity = os.path.getmtime(conversation_dir)
|
|
except Exception:
|
|
last_activity = 0
|
|
|
|
entry = (source_hash_text, display_name, trust_level, sort_name, unread, last_activity)
|
|
conversations.append(entry)
|
|
|
|
except Exception as e:
|
|
RNS.log("Error while loading conversation "+str(dirname)+", skipping it. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
|
|
|
conversations.sort(key=lambda e: e[5], reverse=True)
|
|
|
|
return conversations
|
|
|
|
@staticmethod
|
|
def cache_conversation(conversation):
|
|
Conversation.cached_conversations[conversation.source_hash] = conversation
|
|
|
|
@staticmethod
|
|
def delete_conversation(source_hash_path, app):
|
|
conversation_path = app.conversationpath + "/" + source_hash_path
|
|
|
|
try:
|
|
if os.path.isdir(conversation_path):
|
|
shutil.rmtree(conversation_path)
|
|
except Exception as e:
|
|
RNS.log("Could not remove conversation at "+str(conversation_path)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
|
|
|
|
def __init__(self, source_hash, app, initiator=False):
|
|
self.app = app
|
|
self.source_hash = source_hash
|
|
self.send_destination = None
|
|
self.messages = []
|
|
self.messages_path = app.conversationpath + "/" + source_hash
|
|
self.messages_load_time = None
|
|
self.source_known = False
|
|
self.source_trusted = False
|
|
self.source_blocked = False
|
|
self.unread = False
|
|
|
|
self.__changed_callback = None
|
|
|
|
if not RNS.Identity.recall(bytes.fromhex(self.source_hash)):
|
|
RNS.Transport.request_path(bytes.fromhex(source_hash))
|
|
|
|
self.source_identity = RNS.Identity.recall(bytes.fromhex(self.source_hash))
|
|
|
|
if self.source_identity:
|
|
self.source_known = True
|
|
self.send_destination = RNS.Destination(self.source_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
|
|
|
|
if initiator:
|
|
if not os.path.isdir(self.messages_path):
|
|
os.makedirs(self.messages_path)
|
|
if Conversation.created_callback != None:
|
|
Conversation.created_callback()
|
|
|
|
self.scan_storage()
|
|
|
|
self.trust_level = app.directory.trust_level(bytes.fromhex(self.source_hash))
|
|
|
|
Conversation.cache_conversation(self)
|
|
|
|
def scan_storage(self):
|
|
old_len = len(self.messages)
|
|
existing = {}
|
|
for msg in self.messages:
|
|
existing[msg.file_path] = msg
|
|
|
|
index = ConversationMessage.read_index(self.messages_path)
|
|
|
|
self.messages = []
|
|
for filename in os.listdir(self.messages_path):
|
|
if len(filename) == RNS.Identity.HASHLENGTH//8*2:
|
|
message_path = self.messages_path + "/" + filename
|
|
if message_path in existing:
|
|
old_msg = existing[message_path]
|
|
try:
|
|
current_mtime = os.path.getmtime(message_path)
|
|
if current_mtime > old_msg.sort_timestamp:
|
|
old_msg._cached_state = None
|
|
old_msg._cached_method = None
|
|
old_msg.sort_timestamp = current_mtime
|
|
except Exception:
|
|
pass
|
|
self.messages.append(old_msg)
|
|
else:
|
|
msg = ConversationMessage(message_path)
|
|
if filename in index:
|
|
msg.restore_from_index(index[filename])
|
|
self.messages.append(msg)
|
|
|
|
new_len = len(self.messages)
|
|
|
|
needs_index_update = []
|
|
for msg in self.messages:
|
|
filename = os.path.basename(msg.file_path)
|
|
if msg._cached_state is not None:
|
|
if filename not in index:
|
|
needs_index_update.append(msg)
|
|
elif "content" not in index[filename]:
|
|
needs_index_update.append(msg)
|
|
if needs_index_update:
|
|
ConversationMessage.write_index(self.messages_path, needs_index_update)
|
|
|
|
if new_len > old_len:
|
|
self.unread = True
|
|
|
|
if self.__changed_callback != None:
|
|
self.__changed_callback(self)
|
|
|
|
def purge_failed(self):
|
|
purged_messages = []
|
|
for conversation_message in self.messages:
|
|
if conversation_message.get_state() == LXMF.LXMessage.FAILED:
|
|
purged_messages.append(conversation_message)
|
|
conversation_message.purge()
|
|
|
|
for purged_message in purged_messages:
|
|
self.messages.remove(purged_message)
|
|
|
|
def clear_history(self):
|
|
purged_messages = []
|
|
for conversation_message in self.messages:
|
|
purged_messages.append(conversation_message)
|
|
conversation_message.purge()
|
|
|
|
for purged_message in purged_messages:
|
|
self.messages.remove(purged_message)
|
|
|
|
def register_changed_callback(self, callback):
|
|
self.__changed_callback = callback
|
|
|
|
def send(self, content="", title="", fields=None):
|
|
if self.send_destination:
|
|
dest = self.send_destination
|
|
source = self.app.lxmf_destination
|
|
desired_method = LXMF.LXMessage.DIRECT
|
|
if self.app.directory.preferred_delivery(dest.hash) == DirectoryEntry.PROPAGATED:
|
|
if self.app.message_router.get_outbound_propagation_node() != None:
|
|
desired_method = LXMF.LXMessage.PROPAGATED
|
|
else:
|
|
if not self.app.message_router.delivery_link_available(dest.hash) and RNS.Identity.current_ratchet_id(dest.hash) != None:
|
|
RNS.log(f"Have ratchet for {RNS.prettyhexrep(dest.hash)}, requesting opportunistic delivery of message", RNS.LOG_DEBUG)
|
|
desired_method = LXMF.LXMessage.OPPORTUNISTIC
|
|
|
|
dest_is_trusted = False
|
|
if self.app.directory.trust_level(dest.hash) == DirectoryEntry.TRUSTED:
|
|
dest_is_trusted = True
|
|
|
|
lxm = LXMF.LXMessage(dest, source, content, title=title, fields=fields, desired_method=desired_method, include_ticket=dest_is_trusted)
|
|
lxm.register_delivery_callback(self.message_notification)
|
|
lxm.register_failed_callback(self.message_notification)
|
|
|
|
if self.app.message_router.get_outbound_propagation_node() != None:
|
|
lxm.try_propagation_on_fail = self.app.try_propagation_on_fail
|
|
|
|
self.app.message_router.handle_outbound(lxm)
|
|
|
|
message_path = Conversation.ingest(lxm, self.app, originator=True)
|
|
self.messages.append(ConversationMessage(message_path))
|
|
|
|
return True
|
|
else:
|
|
RNS.log("Destination is not known, cannot create LXMF Message.", RNS.LOG_VERBOSE)
|
|
return False
|
|
|
|
def paper_output(self, content="", title="", mode="print_qr"):
|
|
if self.send_destination:
|
|
try:
|
|
dest = self.send_destination
|
|
source = self.app.lxmf_destination
|
|
desired_method = LXMF.LXMessage.PAPER
|
|
|
|
lxm = LXMF.LXMessage(dest, source, content, title=title, desired_method=desired_method)
|
|
|
|
if mode == "print_qr":
|
|
qr_code = lxm.as_qr()
|
|
qr_tmp_path = self.app.tmpfilespath+"/"+str(RNS.hexrep(lxm.hash, delimit=False))
|
|
qr_code.save(qr_tmp_path)
|
|
|
|
print_result = self.app.print_file(qr_tmp_path)
|
|
os.unlink(qr_tmp_path)
|
|
|
|
if print_result:
|
|
message_path = Conversation.ingest(lxm, self.app, originator=True)
|
|
self.messages.append(ConversationMessage(message_path))
|
|
|
|
return print_result
|
|
|
|
elif mode == "save_qr":
|
|
qr_code = lxm.as_qr()
|
|
qr_save_path = self.app.downloads_path+"/LXM_"+str(RNS.hexrep(lxm.hash, delimit=False)+".png")
|
|
qr_code.save(qr_save_path)
|
|
message_path = Conversation.ingest(lxm, self.app, originator=True)
|
|
self.messages.append(ConversationMessage(message_path))
|
|
return qr_save_path
|
|
|
|
elif mode == "save_uri":
|
|
lxm_uri = lxm.as_uri()+"\n"
|
|
uri_save_path = self.app.downloads_path+"/LXM_"+str(RNS.hexrep(lxm.hash, delimit=False)+".txt")
|
|
with open(uri_save_path, "wb") as f:
|
|
f.write(lxm_uri.encode("utf-8"))
|
|
|
|
message_path = Conversation.ingest(lxm, self.app, originator=True)
|
|
self.messages.append(ConversationMessage(message_path))
|
|
return uri_save_path
|
|
|
|
elif mode == "return_uri":
|
|
return lxm.as_uri()
|
|
|
|
except Exception as e:
|
|
RNS.log("An error occurred while generating paper message, the contained exception was: "+str(e), RNS.LOG_ERROR)
|
|
return False
|
|
|
|
else:
|
|
RNS.log("Destination is not known, cannot create LXMF Message.", RNS.LOG_VERBOSE)
|
|
return False
|
|
|
|
def message_notification(self, message):
|
|
if message.state == LXMF.LXMessage.FAILED and hasattr(message, "try_propagation_on_fail") and message.try_propagation_on_fail:
|
|
if hasattr(message, "stamp_generation_failed") and message.stamp_generation_failed == True:
|
|
RNS.log(f"Could not send {message} due to a stamp generation failure", RNS.LOG_ERROR)
|
|
else:
|
|
RNS.log("Direct delivery of "+str(message)+" failed. Retrying as propagated message.", RNS.LOG_VERBOSE)
|
|
message.try_propagation_on_fail = None
|
|
message.delivery_attempts = 0
|
|
if hasattr(message, "next_delivery_attempt"):
|
|
del message.next_delivery_attempt
|
|
message.packed = None
|
|
message.desired_method = LXMF.LXMessage.PROPAGATED
|
|
self.app.message_router.handle_outbound(message)
|
|
else:
|
|
message_path = Conversation.ingest(message, self.app, originator=True)
|
|
|
|
def __str__(self):
|
|
string = self.source_hash
|
|
|
|
# TODO: Remove this
|
|
# if self.source_identity:
|
|
# if self.source_identity.app_data:
|
|
# # TODO: Sanitise for viewing, or just clean this
|
|
# string += " | "+self.source_identity.app_data.decode("utf-8")
|
|
|
|
return string
|
|
|
|
|
|
|
|
class ConversationMessage:
|
|
def __init__(self, file_path):
|
|
self.file_path = file_path
|
|
self.loaded = False
|
|
self.timestamp = None
|
|
self.lxm = None
|
|
|
|
self._cached_hash = None
|
|
self._cached_state = None
|
|
self._cached_title = None
|
|
self._cached_content = None
|
|
self._cached_source_hash = None
|
|
self._cached_transport_encrypted = None
|
|
self._cached_transport_encryption = None
|
|
self._cached_signature_validated = None
|
|
self._cached_unverified_reason = None
|
|
self._cached_method = None
|
|
self._cached_has_attachments = None
|
|
self._cached_attachment_names = None
|
|
|
|
self.sort_timestamp = os.path.getmtime(file_path) if os.path.isfile(file_path) else 0
|
|
|
|
filename = os.path.basename(file_path)
|
|
if len(filename) == RNS.Identity.HASHLENGTH//8*2:
|
|
try:
|
|
self._cached_hash = bytes.fromhex(filename)
|
|
except Exception:
|
|
pass
|
|
|
|
def load(self):
|
|
try:
|
|
with open(self.file_path, "rb") as lxm_file:
|
|
self.lxm = LXMF.LXMessage.unpack_from_file(lxm_file)
|
|
self.loaded = True
|
|
self.timestamp = self.lxm.timestamp
|
|
self.sort_timestamp = os.path.getmtime(self.file_path)
|
|
|
|
if self.lxm.state > LXMF.LXMessage.GENERATING and self.lxm.state < LXMF.LXMessage.SENT:
|
|
found = False
|
|
|
|
for pending in nomadnet.NomadNetworkApp.get_shared_instance().message_router.pending_outbound:
|
|
if pending.hash == self.lxm.hash:
|
|
found = True
|
|
|
|
for pending_id in nomadnet.NomadNetworkApp.get_shared_instance().message_router.pending_deferred_stamps:
|
|
if pending_id == self.lxm.hash:
|
|
found = True
|
|
|
|
if not found:
|
|
self.lxm.state = LXMF.LXMessage.FAILED
|
|
|
|
if self._cached_hash is None:
|
|
self._cached_hash = self.lxm.hash
|
|
self._cached_state = self.lxm.state
|
|
if self._cached_source_hash is None:
|
|
self._cached_source_hash = self.lxm.source_hash
|
|
self._cached_transport_encrypted = self.lxm.transport_encrypted
|
|
self._cached_transport_encryption = self.lxm.transport_encryption
|
|
if self._cached_signature_validated is None:
|
|
self._cached_signature_validated = self.lxm.signature_validated
|
|
self._cached_method = self.lxm.method
|
|
if self._cached_unverified_reason is None and hasattr(self.lxm, "unverified_reason"):
|
|
self._cached_unverified_reason = self.lxm.unverified_reason
|
|
self._cached_title = self.lxm.title_as_string()
|
|
self._cached_content = self.lxm.content_as_string()
|
|
|
|
if self._cached_has_attachments is None:
|
|
found_in_fields = False
|
|
fields = None
|
|
if hasattr(self.lxm, "get_fields"):
|
|
fields = self.lxm.get_fields()
|
|
if fields and isinstance(fields, dict):
|
|
found_in_fields = (
|
|
LXMF.FIELD_FILE_ATTACHMENTS in fields
|
|
or LXMF.FIELD_IMAGE in fields
|
|
or LXMF.FIELD_AUDIO in fields
|
|
)
|
|
if found_in_fields:
|
|
names = []
|
|
file_atts = fields.get(LXMF.FIELD_FILE_ATTACHMENTS, [])
|
|
for att in file_atts:
|
|
if isinstance(att, list) and len(att) >= 2:
|
|
size = len(att[1]) if isinstance(att[1], bytes) else 0
|
|
names.append(("file", str(att[0]), size))
|
|
if LXMF.FIELD_IMAGE in fields:
|
|
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_IMAGE])
|
|
if data:
|
|
size = len(data)
|
|
ext = ConversationMessage._ext_from_media_format(fmt, data)
|
|
names.append(("file", "image"+ext, size))
|
|
if LXMF.FIELD_AUDIO in fields:
|
|
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_AUDIO])
|
|
if data:
|
|
size = len(data)
|
|
ext = ConversationMessage._ext_from_media_format(fmt, data, is_audio=True)
|
|
names.append(("file", "audio"+ext, size))
|
|
self._cached_has_attachments = True
|
|
self._cached_attachment_names = names
|
|
|
|
if not found_in_fields:
|
|
att_dir = self._attachment_dir()
|
|
if att_dir and os.path.isdir(att_dir):
|
|
manifest = self._read_attachment_manifest(att_dir)
|
|
if manifest and "files" in manifest and len(manifest["files"]) > 0:
|
|
names = []
|
|
for entry in manifest["files"]:
|
|
names.append(("file", entry["name"], entry.get("size", 0)))
|
|
self._cached_has_attachments = True
|
|
self._cached_attachment_names = names
|
|
else:
|
|
self._cached_has_attachments = False
|
|
self._cached_attachment_names = []
|
|
else:
|
|
self._cached_has_attachments = False
|
|
self._cached_attachment_names = []
|
|
|
|
try:
|
|
app = nomadnet.NomadNetworkApp.get_shared_instance()
|
|
ConversationMessage.extract_attachments_from_lxm(self.lxm, app)
|
|
ConversationMessage.strip_attachments_from_file(self.file_path, app)
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
RNS.log("Error while loading LXMF message "+str(self.file_path)+" from disk. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
|
|
|
def unload(self):
|
|
self.loaded = False
|
|
self.lxm = None
|
|
|
|
def purge(self):
|
|
self.unload()
|
|
if os.path.isfile(self.file_path):
|
|
os.unlink(self.file_path)
|
|
|
|
def get_timestamp(self):
|
|
if self.timestamp is not None:
|
|
return self.timestamp
|
|
if not self.loaded:
|
|
self.load()
|
|
return self.timestamp
|
|
|
|
def get_title(self):
|
|
if self._cached_title is not None:
|
|
return self._cached_title
|
|
if not self.loaded:
|
|
self.load()
|
|
return self._cached_title if self._cached_title is not None else ""
|
|
|
|
def get_content(self):
|
|
if self._cached_content is not None:
|
|
return self._cached_content
|
|
if not self.loaded:
|
|
self.load()
|
|
return self._cached_content if self._cached_content is not None else ""
|
|
|
|
def get_hash(self):
|
|
if self._cached_hash is not None:
|
|
return self._cached_hash
|
|
if not self.loaded:
|
|
self.load()
|
|
return self._cached_hash
|
|
|
|
def get_state(self):
|
|
if self._cached_state is not None:
|
|
return self._cached_state
|
|
if not self.loaded:
|
|
self.load()
|
|
return self._cached_state
|
|
|
|
def get_transport_encryption(self):
|
|
if self._cached_transport_encryption is not None:
|
|
return self._cached_transport_encryption
|
|
if not self.loaded:
|
|
self.load()
|
|
return self._cached_transport_encryption
|
|
|
|
def get_transport_encrypted(self):
|
|
if self._cached_transport_encrypted is not None:
|
|
return self._cached_transport_encrypted
|
|
if not self.loaded:
|
|
self.load()
|
|
return self._cached_transport_encrypted
|
|
|
|
def signature_validated(self):
|
|
if self._cached_signature_validated is not None:
|
|
return self._cached_signature_validated
|
|
if not self.loaded:
|
|
self.load()
|
|
return self._cached_signature_validated
|
|
|
|
def get_signature_description(self):
|
|
if self.signature_validated():
|
|
return "Signature Verified"
|
|
else:
|
|
reason = self._cached_unverified_reason
|
|
if reason == LXMF.LXMessage.SOURCE_UNKNOWN:
|
|
return "Unknown Origin"
|
|
elif reason == LXMF.LXMessage.SIGNATURE_INVALID:
|
|
return "Invalid Signature"
|
|
else:
|
|
return "Unknown signature validation failure"
|
|
|
|
def get_fields(self):
|
|
if not self.loaded:
|
|
self.load()
|
|
|
|
if self.lxm and hasattr(self.lxm, "get_fields"):
|
|
fields = self.lxm.get_fields()
|
|
if fields and isinstance(fields, dict):
|
|
return fields
|
|
|
|
return {}
|
|
|
|
def has_attachments(self):
|
|
if self._cached_has_attachments is not None:
|
|
return self._cached_has_attachments
|
|
fields = self.get_fields()
|
|
return (
|
|
LXMF.FIELD_FILE_ATTACHMENTS in fields
|
|
or LXMF.FIELD_IMAGE in fields
|
|
or LXMF.FIELD_AUDIO in fields
|
|
)
|
|
|
|
def get_file_attachments(self):
|
|
att_dir = self._attachment_dir()
|
|
if att_dir and os.path.isdir(att_dir):
|
|
manifest = self._read_attachment_manifest(att_dir)
|
|
if manifest and "files" in manifest:
|
|
result = []
|
|
for entry in manifest["files"]:
|
|
fpath = os.path.join(att_dir, entry["stored_name"])
|
|
if os.path.isfile(fpath):
|
|
with open(fpath, "rb") as f:
|
|
result.append([entry["name"], f.read()])
|
|
return result
|
|
|
|
fields = self.get_fields()
|
|
return fields.get(LXMF.FIELD_FILE_ATTACHMENTS, [])
|
|
|
|
def get_image(self):
|
|
att_dir = self._attachment_dir()
|
|
if att_dir and os.path.isdir(att_dir):
|
|
fpath = os.path.join(att_dir, "image")
|
|
if os.path.isfile(fpath):
|
|
with open(fpath, "rb") as f:
|
|
return f.read()
|
|
|
|
fields = self.get_fields()
|
|
return fields.get(LXMF.FIELD_IMAGE, None)
|
|
|
|
def get_audio(self):
|
|
att_dir = self._attachment_dir()
|
|
if att_dir and os.path.isdir(att_dir):
|
|
fpath = os.path.join(att_dir, "audio")
|
|
if os.path.isfile(fpath):
|
|
with open(fpath, "rb") as f:
|
|
return f.read()
|
|
|
|
fields = self.get_fields()
|
|
return fields.get(LXMF.FIELD_AUDIO, None)
|
|
|
|
def get_attachment_file_path(self, field_type, field_index=0):
|
|
att_dir = self._attachment_dir()
|
|
if att_dir and os.path.isdir(att_dir):
|
|
manifest = self._read_attachment_manifest(att_dir)
|
|
if manifest and "files" in manifest and field_index < len(manifest["files"]):
|
|
return os.path.join(att_dir, manifest["files"][field_index]["stored_name"])
|
|
# Fallback for old extraction format
|
|
if field_type == "image":
|
|
fpath = os.path.join(att_dir, "image")
|
|
if os.path.isfile(fpath):
|
|
return fpath
|
|
elif field_type == "audio":
|
|
fpath = os.path.join(att_dir, "audio")
|
|
if os.path.isfile(fpath):
|
|
return fpath
|
|
return None
|
|
|
|
def _attachment_dir(self):
|
|
try:
|
|
app = nomadnet.NomadNetworkApp.get_shared_instance()
|
|
msg_hash = self.get_hash()
|
|
if msg_hash:
|
|
return os.path.join(app.attachmentpath, RNS.hexrep(msg_hash, delimit=False))
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _read_attachment_manifest(self, att_dir):
|
|
manifest_path = os.path.join(att_dir, "manifest")
|
|
if os.path.isfile(manifest_path):
|
|
try:
|
|
with open(manifest_path, "rb") as f:
|
|
return msgpack.unpackb(f.read(), raw=False)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
@staticmethod
|
|
def extract_attachments_from_lxm(lxmessage, app):
|
|
if not hasattr(lxmessage, "get_fields"):
|
|
return
|
|
fields = lxmessage.get_fields()
|
|
if not fields or not isinstance(fields, dict):
|
|
return
|
|
|
|
has_any = (
|
|
LXMF.FIELD_FILE_ATTACHMENTS in fields
|
|
or LXMF.FIELD_IMAGE in fields
|
|
or LXMF.FIELD_AUDIO in fields
|
|
)
|
|
if not has_any:
|
|
return
|
|
|
|
msg_hash_hex = RNS.hexrep(lxmessage.hash, delimit=False)
|
|
att_dir = os.path.join(app.attachmentpath, msg_hash_hex)
|
|
if os.path.isdir(att_dir):
|
|
return
|
|
|
|
os.makedirs(att_dir)
|
|
manifest = {"files": []}
|
|
|
|
file_attachments = fields.get(LXMF.FIELD_FILE_ATTACHMENTS, [])
|
|
if file_attachments:
|
|
for idx, att in enumerate(file_attachments):
|
|
if isinstance(att, list) and len(att) >= 2:
|
|
filename = str(att[0])
|
|
data = att[1] if isinstance(att[1], bytes) else b""
|
|
stored_name = "file_"+str(idx)
|
|
with open(os.path.join(att_dir, stored_name), "wb") as f:
|
|
f.write(data)
|
|
manifest["files"].append({"name": filename, "stored_name": stored_name, "size": len(data)})
|
|
|
|
if LXMF.FIELD_IMAGE in fields:
|
|
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_IMAGE])
|
|
if data:
|
|
ext = ConversationMessage._ext_from_media_format(fmt, data)
|
|
filename = "image" + ext
|
|
stored_name = "file_"+str(len(manifest["files"]))
|
|
with open(os.path.join(att_dir, stored_name), "wb") as f:
|
|
f.write(data)
|
|
manifest["files"].append({"name": filename, "stored_name": stored_name, "size": len(data)})
|
|
|
|
if LXMF.FIELD_AUDIO in fields:
|
|
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_AUDIO])
|
|
if data:
|
|
ext = ConversationMessage._ext_from_media_format(fmt, data, is_audio=True)
|
|
filename = "audio" + ext
|
|
stored_name = "file_"+str(len(manifest["files"]))
|
|
with open(os.path.join(att_dir, stored_name), "wb") as f:
|
|
f.write(data)
|
|
manifest["files"].append({"name": filename, "stored_name": stored_name, "size": len(data)})
|
|
|
|
with open(os.path.join(att_dir, "manifest"), "wb") as f:
|
|
f.write(msgpack.packb(manifest))
|
|
|
|
@staticmethod
|
|
def _unpack_media_field(field_data):
|
|
"""Normalize FIELD_IMAGE/FIELD_AUDIO which can be raw bytes or [format, bytes].
|
|
Format element can be a string ('webp') or integer (LXMF audio mode constant)."""
|
|
if isinstance(field_data, bytes):
|
|
return None, field_data
|
|
elif isinstance(field_data, list) and len(field_data) >= 2 and isinstance(field_data[1], bytes):
|
|
return field_data[0], field_data[1]
|
|
return None, None
|
|
|
|
@staticmethod
|
|
def _detect_image_ext(data):
|
|
if not isinstance(data, bytes) or len(data) < 12:
|
|
return ".bin"
|
|
if data[:8] == b'\x89PNG\r\n\x1a\n':
|
|
return ".png"
|
|
elif data[:3] == b'\xff\xd8\xff':
|
|
return ".jpg"
|
|
elif data[:4] == b'GIF8':
|
|
return ".gif"
|
|
elif data[:4] == b'RIFF' and data[8:12] == b'WEBP':
|
|
return ".webp"
|
|
elif data[:4] == b'\x00\x00\x00\x1c' or data[:4] == b'\x00\x00\x00\x18':
|
|
return ".heic"
|
|
return ".bin"
|
|
|
|
@staticmethod
|
|
def _detect_audio_ext(data):
|
|
if not isinstance(data, bytes) or len(data) < 12:
|
|
return ".bin"
|
|
if data[:4] == b'OggS':
|
|
return ".ogg"
|
|
elif data[:2] == b'\xff\xfb' or data[:3] == b'ID3':
|
|
return ".mp3"
|
|
elif data[:4] == b'RIFF' and data[8:12] == b'WAVE':
|
|
return ".wav"
|
|
elif data[:4] == b'fLaC':
|
|
return ".flac"
|
|
return ".bin"
|
|
|
|
@staticmethod
|
|
def _ext_from_media_format(fmt, data, is_audio=False):
|
|
"""Derive file extension from format identifier and data.
|
|
fmt can be a string ('webp'), an integer (LXMF audio mode), or None."""
|
|
if isinstance(fmt, str) and len(fmt) > 0:
|
|
return "." + fmt.lower().strip(".")
|
|
if isinstance(fmt, int) and is_audio:
|
|
if fmt >= 16 and fmt <= 25:
|
|
return ".ogg"
|
|
elif fmt >= 1 and fmt <= 9:
|
|
return ".c2"
|
|
if is_audio:
|
|
return ConversationMessage._detect_audio_ext(data)
|
|
return ConversationMessage._detect_image_ext(data)
|
|
|
|
@staticmethod
|
|
def strip_attachments_from_file(file_path, app):
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
container = msgpack.unpackb(f.read(), strict_map_key=False)
|
|
|
|
lxmf_bytes = container[b"lxmf_bytes"] if b"lxmf_bytes" in container else container.get("lxmf_bytes")
|
|
if lxmf_bytes is None:
|
|
return
|
|
|
|
dest_len = LXMF.LXMessage.DESTINATION_LENGTH
|
|
sig_len = LXMF.LXMessage.SIGNATURE_LENGTH
|
|
header_len = 2 * dest_len + sig_len
|
|
|
|
header = lxmf_bytes[:header_len]
|
|
payload_bytes = lxmf_bytes[header_len:]
|
|
payload = msgpack.unpackb(payload_bytes, strict_map_key=False)
|
|
|
|
if len(payload) < 4 or not isinstance(payload[3], dict):
|
|
return
|
|
|
|
fields = payload[3]
|
|
attachment_keys = [LXMF.FIELD_FILE_ATTACHMENTS, LXMF.FIELD_IMAGE, LXMF.FIELD_AUDIO]
|
|
if not any(k in fields for k in attachment_keys):
|
|
return
|
|
|
|
# Compute message hash matching LXMF's logic
|
|
if len(payload) > 4:
|
|
hash_payload = msgpack.packb(payload[:4])
|
|
else:
|
|
hash_payload = payload_bytes
|
|
msg_hash = RNS.Identity.full_hash(lxmf_bytes[:2*dest_len] + hash_payload)
|
|
msg_hash_hex = RNS.hexrep(msg_hash, delimit=False)
|
|
|
|
att_dir = os.path.join(app.attachmentpath, msg_hash_hex)
|
|
if not os.path.isdir(att_dir):
|
|
return
|
|
|
|
for k in attachment_keys:
|
|
if k in fields:
|
|
del fields[k]
|
|
|
|
new_lxmf_bytes = header + msgpack.packb(payload)
|
|
key = b"lxmf_bytes" if b"lxmf_bytes" in container else "lxmf_bytes"
|
|
container[key] = new_lxmf_bytes
|
|
|
|
with open(file_path, "wb") as f:
|
|
f.write(msgpack.packb(container))
|
|
|
|
except Exception as e:
|
|
RNS.log("Error stripping attachments from LXM file: "+str(e), RNS.LOG_ERROR)
|
|
|
|
def to_index_entry(self):
|
|
return {
|
|
"timestamp": self.timestamp,
|
|
"sort_timestamp": self.sort_timestamp,
|
|
"state": self._cached_state,
|
|
"title": self._cached_title,
|
|
"content": self._cached_content,
|
|
"source_hash": self._cached_source_hash,
|
|
"transport_encrypted": self._cached_transport_encrypted,
|
|
"transport_encryption": self._cached_transport_encryption,
|
|
"signature_validated": self._cached_signature_validated,
|
|
"unverified_reason": self._cached_unverified_reason,
|
|
"method": self._cached_method,
|
|
"has_attachments": self._cached_has_attachments,
|
|
"attachment_names": self._cached_attachment_names,
|
|
}
|
|
|
|
def restore_from_index(self, entry):
|
|
self.timestamp = entry.get("timestamp")
|
|
self.sort_timestamp = entry.get("sort_timestamp", self.sort_timestamp)
|
|
self._cached_state = entry.get("state")
|
|
self._cached_title = entry.get("title")
|
|
self._cached_content = entry.get("content")
|
|
self._cached_source_hash = entry.get("source_hash")
|
|
self._cached_transport_encrypted = entry.get("transport_encrypted")
|
|
self._cached_transport_encryption = entry.get("transport_encryption")
|
|
self._cached_signature_validated = entry.get("signature_validated")
|
|
self._cached_unverified_reason = entry.get("unverified_reason")
|
|
self._cached_method = entry.get("method")
|
|
self._cached_has_attachments = entry.get("has_attachments")
|
|
self._cached_attachment_names = entry.get("attachment_names")
|
|
|
|
@staticmethod
|
|
def read_index(conversation_path):
|
|
index_path = os.path.join(conversation_path, ".index")
|
|
if os.path.isfile(index_path):
|
|
try:
|
|
with open(index_path, "rb") as f:
|
|
return msgpack.unpackb(f.read(), raw=False)
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
@staticmethod
|
|
def write_index(conversation_path, messages):
|
|
index_path = os.path.join(conversation_path, ".index")
|
|
index = {}
|
|
if os.path.isfile(index_path):
|
|
try:
|
|
with open(index_path, "rb") as f:
|
|
index = msgpack.unpackb(f.read(), raw=False)
|
|
except Exception:
|
|
index = {}
|
|
|
|
for msg in messages:
|
|
if msg._cached_state is not None:
|
|
filename = os.path.basename(msg.file_path)
|
|
index[filename] = msg.to_index_entry()
|
|
|
|
try:
|
|
with open(index_path, "wb") as f:
|
|
f.write(msgpack.packb(index))
|
|
except Exception as e:
|
|
RNS.log("Error writing conversation index: "+str(e), RNS.LOG_ERROR)
|