markqvist___NomadNet/nomadnet/Conversation.py
2026-04-07 13:37:15 +02:00

906 lines
37 KiB
Python

import os
import RNS
import LXMF
import shutil
import RNS.vendor.umsgpack as msgpack
import nomadnet
from nomadnet.Directory import DirectoryEntry
class Conversation:
cached_conversations = {}
unread_conversations = {}
created_callback = None
aspect_filter = "lxmf.delivery"
@staticmethod
def received_announce(destination_hash, announced_identity, app_data):
app = nomadnet.NomadNetworkApp.get_shared_instance()
if not destination_hash in app.ignored_list:
destination_hash_text = RNS.hexrep(destination_hash, delimit=False)
# Check if the announced destination is in
# our list of conversations
if destination_hash_text in [e[0] for e in Conversation.conversation_list(app)]:
if app.directory.find(destination_hash):
if Conversation.created_callback != None:
Conversation.created_callback()
else:
if Conversation.created_callback != None:
Conversation.created_callback()
# This reformats the new v0.5.0 announce data back to the expected format
# for nomadnets storage and other handling functions.
dn = LXMF.display_name_from_app_data(app_data)
app_data = b""
if dn != None:
app_data = dn.encode("utf-8")
# Add the announce to the directory announce
# stream logger
app.directory.lxmf_announce_received(destination_hash, app_data)
else:
RNS.log("Ignored announce from "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
@staticmethod
def query_for_peer(source_hash):
try:
RNS.Transport.request_path(bytes.fromhex(source_hash))
except Exception as e:
RNS.log("Error while querying network for peer identity. The contained exception was: "+str(e), RNS.LOG_ERROR)
@staticmethod
def ingest(lxmessage, app, originator = False, delegate = None):
if originator:
source_hash = lxmessage.destination_hash
else:
source_hash = lxmessage.source_hash
source_hash_path = RNS.hexrep(source_hash, delimit=False)
conversation_path = app.conversationpath + "/" + source_hash_path
if not os.path.isdir(conversation_path):
os.makedirs(conversation_path)
if Conversation.created_callback != None:
Conversation.created_callback()
ingested_path = lxmessage.write_to_directory(conversation_path)
try:
ConversationMessage.extract_attachments_from_lxm(lxmessage, app)
except Exception as e:
RNS.log("Error extracting attachments: "+str(e), RNS.LOG_ERROR)
if RNS.hexrep(source_hash, delimit=False) in Conversation.cached_conversations:
conversation = Conversation.cached_conversations[RNS.hexrep(source_hash, delimit=False)]
conversation.scan_storage()
if source_hash in Conversation.unread_conversations:
Conversation.unread_conversations[source_hash] += 1
else:
Conversation.unread_conversations[source_hash] = 1
try:
dirname = RNS.hexrep(source_hash, delimit=False)
with open(app.conversationpath + "/" + dirname + "/unread", "w") as uf:
uf.write(str(Conversation.unread_conversations[source_hash]))
except Exception as e:
pass
if Conversation.created_callback != None:
Conversation.created_callback()
return ingested_path
@staticmethod
def conversation_list(app):
conversations = []
for dirname in os.listdir(app.conversationpath):
if len(dirname) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2 and os.path.isdir(app.conversationpath + "/" + dirname):
try:
source_hash_text = dirname
source_hash = bytes.fromhex(dirname)
app_data = RNS.Identity.recall_app_data(source_hash)
display_name = app.directory.display_name(source_hash)
unread = 0
if source_hash in Conversation.unread_conversations:
unread = Conversation.unread_conversations[source_hash]
elif os.path.isfile(app.conversationpath + "/" + dirname + "/unread"):
try:
with open(app.conversationpath + "/" + dirname + "/unread", "r") as uf:
content = uf.read().strip()
unread = int(content) if content else 1
except Exception:
unread = 1
Conversation.unread_conversations[source_hash] = unread
if display_name == None and app_data:
display_name = LXMF.display_name_from_app_data(app_data)
if display_name == None:
sort_name = ""
else:
sort_name = display_name
trust_level = app.directory.trust_level(source_hash, display_name)
conversation_dir = app.conversationpath + "/" + dirname
try:
last_activity = os.path.getmtime(conversation_dir)
except Exception:
last_activity = 0
entry = (source_hash_text, display_name, trust_level, sort_name, unread, last_activity)
conversations.append(entry)
except Exception as e:
RNS.log("Error while loading conversation "+str(dirname)+", skipping it. The contained exception was: "+str(e), RNS.LOG_ERROR)
conversations.sort(key=lambda e: e[5], reverse=True)
return conversations
@staticmethod
def cache_conversation(conversation):
Conversation.cached_conversations[conversation.source_hash] = conversation
@staticmethod
def delete_conversation(source_hash_path, app):
conversation_path = app.conversationpath + "/" + source_hash_path
try:
if os.path.isdir(conversation_path):
shutil.rmtree(conversation_path)
except Exception as e:
RNS.log("Could not remove conversation at "+str(conversation_path)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
def __init__(self, source_hash, app, initiator=False):
self.app = app
self.source_hash = source_hash
self.send_destination = None
self.messages = []
self.messages_path = app.conversationpath + "/" + source_hash
self.messages_load_time = None
self.source_known = False
self.source_trusted = False
self.source_blocked = False
self.unread = False
self.__changed_callback = None
if not RNS.Identity.recall(bytes.fromhex(self.source_hash)):
RNS.Transport.request_path(bytes.fromhex(source_hash))
self.source_identity = RNS.Identity.recall(bytes.fromhex(self.source_hash))
if self.source_identity:
self.source_known = True
self.send_destination = RNS.Destination(self.source_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
if initiator:
if not os.path.isdir(self.messages_path):
os.makedirs(self.messages_path)
if Conversation.created_callback != None:
Conversation.created_callback()
self.scan_storage()
self.trust_level = app.directory.trust_level(bytes.fromhex(self.source_hash))
Conversation.cache_conversation(self)
def scan_storage(self):
old_len = len(self.messages)
existing = {}
for msg in self.messages:
existing[msg.file_path] = msg
index = ConversationMessage.read_index(self.messages_path)
self.messages = []
for filename in os.listdir(self.messages_path):
if len(filename) == RNS.Identity.HASHLENGTH//8*2:
message_path = self.messages_path + "/" + filename
if message_path in existing:
old_msg = existing[message_path]
try:
current_mtime = os.path.getmtime(message_path)
if current_mtime > old_msg.sort_timestamp:
old_msg._cached_state = None
old_msg._cached_method = None
old_msg.sort_timestamp = current_mtime
except Exception:
pass
self.messages.append(old_msg)
else:
msg = ConversationMessage(message_path)
if filename in index:
msg.restore_from_index(index[filename])
self.messages.append(msg)
new_len = len(self.messages)
needs_index_update = []
for msg in self.messages:
filename = os.path.basename(msg.file_path)
if msg._cached_state is not None:
if filename not in index:
needs_index_update.append(msg)
elif "content" not in index[filename]:
needs_index_update.append(msg)
if needs_index_update:
ConversationMessage.write_index(self.messages_path, needs_index_update)
if new_len > old_len:
self.unread = True
if self.__changed_callback != None:
self.__changed_callback(self)
def purge_failed(self):
purged_messages = []
for conversation_message in self.messages:
if conversation_message.get_state() == LXMF.LXMessage.FAILED:
purged_messages.append(conversation_message)
conversation_message.purge()
for purged_message in purged_messages:
self.messages.remove(purged_message)
def clear_history(self):
purged_messages = []
for conversation_message in self.messages:
purged_messages.append(conversation_message)
conversation_message.purge()
for purged_message in purged_messages:
self.messages.remove(purged_message)
def register_changed_callback(self, callback):
self.__changed_callback = callback
def send(self, content="", title="", fields=None):
if self.send_destination:
dest = self.send_destination
source = self.app.lxmf_destination
desired_method = LXMF.LXMessage.DIRECT
if self.app.directory.preferred_delivery(dest.hash) == DirectoryEntry.PROPAGATED:
if self.app.message_router.get_outbound_propagation_node() != None:
desired_method = LXMF.LXMessage.PROPAGATED
else:
if not self.app.message_router.delivery_link_available(dest.hash) and RNS.Identity.current_ratchet_id(dest.hash) != None:
RNS.log(f"Have ratchet for {RNS.prettyhexrep(dest.hash)}, requesting opportunistic delivery of message", RNS.LOG_DEBUG)
desired_method = LXMF.LXMessage.OPPORTUNISTIC
dest_is_trusted = False
if self.app.directory.trust_level(dest.hash) == DirectoryEntry.TRUSTED:
dest_is_trusted = True
lxm = LXMF.LXMessage(dest, source, content, title=title, fields=fields, desired_method=desired_method, include_ticket=dest_is_trusted)
lxm.register_delivery_callback(self.message_notification)
lxm.register_failed_callback(self.message_notification)
if self.app.message_router.get_outbound_propagation_node() != None:
lxm.try_propagation_on_fail = self.app.try_propagation_on_fail
self.app.message_router.handle_outbound(lxm)
message_path = Conversation.ingest(lxm, self.app, originator=True)
self.messages.append(ConversationMessage(message_path))
return True
else:
RNS.log("Destination is not known, cannot create LXMF Message.", RNS.LOG_VERBOSE)
return False
def paper_output(self, content="", title="", mode="print_qr"):
if self.send_destination:
try:
dest = self.send_destination
source = self.app.lxmf_destination
desired_method = LXMF.LXMessage.PAPER
lxm = LXMF.LXMessage(dest, source, content, title=title, desired_method=desired_method)
if mode == "print_qr":
qr_code = lxm.as_qr()
qr_tmp_path = self.app.tmpfilespath+"/"+str(RNS.hexrep(lxm.hash, delimit=False))
qr_code.save(qr_tmp_path)
print_result = self.app.print_file(qr_tmp_path)
os.unlink(qr_tmp_path)
if print_result:
message_path = Conversation.ingest(lxm, self.app, originator=True)
self.messages.append(ConversationMessage(message_path))
return print_result
elif mode == "save_qr":
qr_code = lxm.as_qr()
qr_save_path = self.app.downloads_path+"/LXM_"+str(RNS.hexrep(lxm.hash, delimit=False)+".png")
qr_code.save(qr_save_path)
message_path = Conversation.ingest(lxm, self.app, originator=True)
self.messages.append(ConversationMessage(message_path))
return qr_save_path
elif mode == "save_uri":
lxm_uri = lxm.as_uri()+"\n"
uri_save_path = self.app.downloads_path+"/LXM_"+str(RNS.hexrep(lxm.hash, delimit=False)+".txt")
with open(uri_save_path, "wb") as f:
f.write(lxm_uri.encode("utf-8"))
message_path = Conversation.ingest(lxm, self.app, originator=True)
self.messages.append(ConversationMessage(message_path))
return uri_save_path
elif mode == "return_uri":
return lxm.as_uri()
except Exception as e:
RNS.log("An error occurred while generating paper message, the contained exception was: "+str(e), RNS.LOG_ERROR)
return False
else:
RNS.log("Destination is not known, cannot create LXMF Message.", RNS.LOG_VERBOSE)
return False
def message_notification(self, message):
if message.state == LXMF.LXMessage.FAILED and hasattr(message, "try_propagation_on_fail") and message.try_propagation_on_fail:
if hasattr(message, "stamp_generation_failed") and message.stamp_generation_failed == True:
RNS.log(f"Could not send {message} due to a stamp generation failure", RNS.LOG_ERROR)
else:
RNS.log("Direct delivery of "+str(message)+" failed. Retrying as propagated message.", RNS.LOG_VERBOSE)
message.try_propagation_on_fail = None
message.delivery_attempts = 0
if hasattr(message, "next_delivery_attempt"):
del message.next_delivery_attempt
message.packed = None
message.desired_method = LXMF.LXMessage.PROPAGATED
self.app.message_router.handle_outbound(message)
else:
message_path = Conversation.ingest(message, self.app, originator=True)
def __str__(self):
string = self.source_hash
# TODO: Remove this
# if self.source_identity:
# if self.source_identity.app_data:
# # TODO: Sanitise for viewing, or just clean this
# string += " | "+self.source_identity.app_data.decode("utf-8")
return string
class ConversationMessage:
def __init__(self, file_path):
self.file_path = file_path
self.loaded = False
self.timestamp = None
self.lxm = None
self._cached_hash = None
self._cached_state = None
self._cached_title = None
self._cached_content = None
self._cached_source_hash = None
self._cached_transport_encrypted = None
self._cached_transport_encryption = None
self._cached_signature_validated = None
self._cached_unverified_reason = None
self._cached_method = None
self._cached_has_attachments = None
self._cached_attachment_names = None
self.sort_timestamp = os.path.getmtime(file_path) if os.path.isfile(file_path) else 0
filename = os.path.basename(file_path)
if len(filename) == RNS.Identity.HASHLENGTH//8*2:
try:
self._cached_hash = bytes.fromhex(filename)
except Exception:
pass
def load(self):
try:
with open(self.file_path, "rb") as lxm_file:
self.lxm = LXMF.LXMessage.unpack_from_file(lxm_file)
self.loaded = True
self.timestamp = self.lxm.timestamp
self.sort_timestamp = os.path.getmtime(self.file_path)
if self.lxm.state > LXMF.LXMessage.GENERATING and self.lxm.state < LXMF.LXMessage.SENT:
found = False
for pending in nomadnet.NomadNetworkApp.get_shared_instance().message_router.pending_outbound:
if pending.hash == self.lxm.hash:
found = True
for pending_id in nomadnet.NomadNetworkApp.get_shared_instance().message_router.pending_deferred_stamps:
if pending_id == self.lxm.hash:
found = True
if not found:
self.lxm.state = LXMF.LXMessage.FAILED
if self._cached_hash is None:
self._cached_hash = self.lxm.hash
self._cached_state = self.lxm.state
if self._cached_source_hash is None:
self._cached_source_hash = self.lxm.source_hash
self._cached_transport_encrypted = self.lxm.transport_encrypted
self._cached_transport_encryption = self.lxm.transport_encryption
if self._cached_signature_validated is None:
self._cached_signature_validated = self.lxm.signature_validated
self._cached_method = self.lxm.method
if self._cached_unverified_reason is None and hasattr(self.lxm, "unverified_reason"):
self._cached_unverified_reason = self.lxm.unverified_reason
self._cached_title = self.lxm.title_as_string()
self._cached_content = self.lxm.content_as_string()
if self._cached_has_attachments is None:
found_in_fields = False
fields = None
if hasattr(self.lxm, "get_fields"):
fields = self.lxm.get_fields()
if fields and isinstance(fields, dict):
found_in_fields = (
LXMF.FIELD_FILE_ATTACHMENTS in fields
or LXMF.FIELD_IMAGE in fields
or LXMF.FIELD_AUDIO in fields
)
if found_in_fields:
names = []
file_atts = fields.get(LXMF.FIELD_FILE_ATTACHMENTS, [])
for att in file_atts:
if isinstance(att, list) and len(att) >= 2:
size = len(att[1]) if isinstance(att[1], bytes) else 0
names.append(("file", str(att[0]), size))
if LXMF.FIELD_IMAGE in fields:
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_IMAGE])
if data:
size = len(data)
ext = ConversationMessage._ext_from_media_format(fmt, data)
names.append(("file", "image"+ext, size))
if LXMF.FIELD_AUDIO in fields:
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_AUDIO])
if data:
size = len(data)
ext = ConversationMessage._ext_from_media_format(fmt, data, is_audio=True)
names.append(("file", "audio"+ext, size))
self._cached_has_attachments = True
self._cached_attachment_names = names
if not found_in_fields:
att_dir = self._attachment_dir()
if att_dir and os.path.isdir(att_dir):
manifest = self._read_attachment_manifest(att_dir)
if manifest and "files" in manifest and len(manifest["files"]) > 0:
names = []
for entry in manifest["files"]:
names.append(("file", entry["name"], entry.get("size", 0)))
self._cached_has_attachments = True
self._cached_attachment_names = names
else:
self._cached_has_attachments = False
self._cached_attachment_names = []
else:
self._cached_has_attachments = False
self._cached_attachment_names = []
try:
app = nomadnet.NomadNetworkApp.get_shared_instance()
ConversationMessage.extract_attachments_from_lxm(self.lxm, app)
ConversationMessage.strip_attachments_from_file(self.file_path, app)
except Exception:
pass
except Exception as e:
RNS.log("Error while loading LXMF message "+str(self.file_path)+" from disk. The contained exception was: "+str(e), RNS.LOG_ERROR)
def unload(self):
self.loaded = False
self.lxm = None
def purge(self):
self.unload()
if os.path.isfile(self.file_path):
os.unlink(self.file_path)
def get_timestamp(self):
if self.timestamp is not None:
return self.timestamp
if not self.loaded:
self.load()
return self.timestamp
def get_title(self):
if self._cached_title is not None:
return self._cached_title
if not self.loaded:
self.load()
return self._cached_title if self._cached_title is not None else ""
def get_content(self):
if self._cached_content is not None:
return self._cached_content
if not self.loaded:
self.load()
return self._cached_content if self._cached_content is not None else ""
def get_hash(self):
if self._cached_hash is not None:
return self._cached_hash
if not self.loaded:
self.load()
return self._cached_hash
def get_state(self):
if self._cached_state is not None:
return self._cached_state
if not self.loaded:
self.load()
return self._cached_state
def get_transport_encryption(self):
if self._cached_transport_encryption is not None:
return self._cached_transport_encryption
if not self.loaded:
self.load()
return self._cached_transport_encryption
def get_transport_encrypted(self):
if self._cached_transport_encrypted is not None:
return self._cached_transport_encrypted
if not self.loaded:
self.load()
return self._cached_transport_encrypted
def signature_validated(self):
if self._cached_signature_validated is not None:
return self._cached_signature_validated
if not self.loaded:
self.load()
return self._cached_signature_validated
def get_signature_description(self):
if self.signature_validated():
return "Signature Verified"
else:
reason = self._cached_unverified_reason
if reason == LXMF.LXMessage.SOURCE_UNKNOWN:
return "Unknown Origin"
elif reason == LXMF.LXMessage.SIGNATURE_INVALID:
return "Invalid Signature"
else:
return "Unknown signature validation failure"
def get_fields(self):
if not self.loaded:
self.load()
if self.lxm and hasattr(self.lxm, "get_fields"):
fields = self.lxm.get_fields()
if fields and isinstance(fields, dict):
return fields
return {}
def has_attachments(self):
if self._cached_has_attachments is not None:
return self._cached_has_attachments
fields = self.get_fields()
return (
LXMF.FIELD_FILE_ATTACHMENTS in fields
or LXMF.FIELD_IMAGE in fields
or LXMF.FIELD_AUDIO in fields
)
def get_file_attachments(self):
att_dir = self._attachment_dir()
if att_dir and os.path.isdir(att_dir):
manifest = self._read_attachment_manifest(att_dir)
if manifest and "files" in manifest:
result = []
for entry in manifest["files"]:
fpath = os.path.join(att_dir, entry["stored_name"])
if os.path.isfile(fpath):
with open(fpath, "rb") as f:
result.append([entry["name"], f.read()])
return result
fields = self.get_fields()
return fields.get(LXMF.FIELD_FILE_ATTACHMENTS, [])
def get_image(self):
att_dir = self._attachment_dir()
if att_dir and os.path.isdir(att_dir):
fpath = os.path.join(att_dir, "image")
if os.path.isfile(fpath):
with open(fpath, "rb") as f:
return f.read()
fields = self.get_fields()
return fields.get(LXMF.FIELD_IMAGE, None)
def get_audio(self):
att_dir = self._attachment_dir()
if att_dir and os.path.isdir(att_dir):
fpath = os.path.join(att_dir, "audio")
if os.path.isfile(fpath):
with open(fpath, "rb") as f:
return f.read()
fields = self.get_fields()
return fields.get(LXMF.FIELD_AUDIO, None)
def get_attachment_file_path(self, field_type, field_index=0):
att_dir = self._attachment_dir()
if att_dir and os.path.isdir(att_dir):
manifest = self._read_attachment_manifest(att_dir)
if manifest and "files" in manifest and field_index < len(manifest["files"]):
return os.path.join(att_dir, manifest["files"][field_index]["stored_name"])
# Fallback for old extraction format
if field_type == "image":
fpath = os.path.join(att_dir, "image")
if os.path.isfile(fpath):
return fpath
elif field_type == "audio":
fpath = os.path.join(att_dir, "audio")
if os.path.isfile(fpath):
return fpath
return None
def _attachment_dir(self):
try:
app = nomadnet.NomadNetworkApp.get_shared_instance()
msg_hash = self.get_hash()
if msg_hash:
return os.path.join(app.attachmentpath, RNS.hexrep(msg_hash, delimit=False))
except Exception:
pass
return None
def _read_attachment_manifest(self, att_dir):
manifest_path = os.path.join(att_dir, "manifest")
if os.path.isfile(manifest_path):
try:
with open(manifest_path, "rb") as f:
return msgpack.unpackb(f.read(), raw=False)
except Exception:
pass
return None
@staticmethod
def extract_attachments_from_lxm(lxmessage, app):
if not hasattr(lxmessage, "get_fields"):
return
fields = lxmessage.get_fields()
if not fields or not isinstance(fields, dict):
return
has_any = (
LXMF.FIELD_FILE_ATTACHMENTS in fields
or LXMF.FIELD_IMAGE in fields
or LXMF.FIELD_AUDIO in fields
)
if not has_any:
return
msg_hash_hex = RNS.hexrep(lxmessage.hash, delimit=False)
att_dir = os.path.join(app.attachmentpath, msg_hash_hex)
if os.path.isdir(att_dir):
return
os.makedirs(att_dir)
manifest = {"files": []}
file_attachments = fields.get(LXMF.FIELD_FILE_ATTACHMENTS, [])
if file_attachments:
for idx, att in enumerate(file_attachments):
if isinstance(att, list) and len(att) >= 2:
filename = str(att[0])
data = att[1] if isinstance(att[1], bytes) else b""
stored_name = "file_"+str(idx)
with open(os.path.join(att_dir, stored_name), "wb") as f:
f.write(data)
manifest["files"].append({"name": filename, "stored_name": stored_name, "size": len(data)})
if LXMF.FIELD_IMAGE in fields:
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_IMAGE])
if data:
ext = ConversationMessage._ext_from_media_format(fmt, data)
filename = "image" + ext
stored_name = "file_"+str(len(manifest["files"]))
with open(os.path.join(att_dir, stored_name), "wb") as f:
f.write(data)
manifest["files"].append({"name": filename, "stored_name": stored_name, "size": len(data)})
if LXMF.FIELD_AUDIO in fields:
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_AUDIO])
if data:
ext = ConversationMessage._ext_from_media_format(fmt, data, is_audio=True)
filename = "audio" + ext
stored_name = "file_"+str(len(manifest["files"]))
with open(os.path.join(att_dir, stored_name), "wb") as f:
f.write(data)
manifest["files"].append({"name": filename, "stored_name": stored_name, "size": len(data)})
with open(os.path.join(att_dir, "manifest"), "wb") as f:
f.write(msgpack.packb(manifest))
@staticmethod
def _unpack_media_field(field_data):
"""Normalize FIELD_IMAGE/FIELD_AUDIO which can be raw bytes or [format, bytes].
Format element can be a string ('webp') or integer (LXMF audio mode constant)."""
if isinstance(field_data, bytes):
return None, field_data
elif isinstance(field_data, list) and len(field_data) >= 2 and isinstance(field_data[1], bytes):
return field_data[0], field_data[1]
return None, None
@staticmethod
def _detect_image_ext(data):
if not isinstance(data, bytes) or len(data) < 12:
return ".bin"
if data[:8] == b'\x89PNG\r\n\x1a\n':
return ".png"
elif data[:3] == b'\xff\xd8\xff':
return ".jpg"
elif data[:4] == b'GIF8':
return ".gif"
elif data[:4] == b'RIFF' and data[8:12] == b'WEBP':
return ".webp"
elif data[:4] == b'\x00\x00\x00\x1c' or data[:4] == b'\x00\x00\x00\x18':
return ".heic"
return ".bin"
@staticmethod
def _detect_audio_ext(data):
if not isinstance(data, bytes) or len(data) < 12:
return ".bin"
if data[:4] == b'OggS':
return ".ogg"
elif data[:2] == b'\xff\xfb' or data[:3] == b'ID3':
return ".mp3"
elif data[:4] == b'RIFF' and data[8:12] == b'WAVE':
return ".wav"
elif data[:4] == b'fLaC':
return ".flac"
return ".bin"
@staticmethod
def _ext_from_media_format(fmt, data, is_audio=False):
"""Derive file extension from format identifier and data.
fmt can be a string ('webp'), an integer (LXMF audio mode), or None."""
if isinstance(fmt, str) and len(fmt) > 0:
return "." + fmt.lower().strip(".")
if isinstance(fmt, int) and is_audio:
if fmt >= 16 and fmt <= 25:
return ".ogg"
elif fmt >= 1 and fmt <= 9:
return ".c2"
if is_audio:
return ConversationMessage._detect_audio_ext(data)
return ConversationMessage._detect_image_ext(data)
@staticmethod
def strip_attachments_from_file(file_path, app):
try:
with open(file_path, "rb") as f:
container = msgpack.unpackb(f.read(), strict_map_key=False)
lxmf_bytes = container[b"lxmf_bytes"] if b"lxmf_bytes" in container else container.get("lxmf_bytes")
if lxmf_bytes is None:
return
dest_len = LXMF.LXMessage.DESTINATION_LENGTH
sig_len = LXMF.LXMessage.SIGNATURE_LENGTH
header_len = 2 * dest_len + sig_len
header = lxmf_bytes[:header_len]
payload_bytes = lxmf_bytes[header_len:]
payload = msgpack.unpackb(payload_bytes, strict_map_key=False)
if len(payload) < 4 or not isinstance(payload[3], dict):
return
fields = payload[3]
attachment_keys = [LXMF.FIELD_FILE_ATTACHMENTS, LXMF.FIELD_IMAGE, LXMF.FIELD_AUDIO]
if not any(k in fields for k in attachment_keys):
return
# Compute message hash matching LXMF's logic
if len(payload) > 4:
hash_payload = msgpack.packb(payload[:4])
else:
hash_payload = payload_bytes
msg_hash = RNS.Identity.full_hash(lxmf_bytes[:2*dest_len] + hash_payload)
msg_hash_hex = RNS.hexrep(msg_hash, delimit=False)
att_dir = os.path.join(app.attachmentpath, msg_hash_hex)
if not os.path.isdir(att_dir):
return
for k in attachment_keys:
if k in fields:
del fields[k]
new_lxmf_bytes = header + msgpack.packb(payload)
key = b"lxmf_bytes" if b"lxmf_bytes" in container else "lxmf_bytes"
container[key] = new_lxmf_bytes
with open(file_path, "wb") as f:
f.write(msgpack.packb(container))
except Exception as e:
RNS.log("Error stripping attachments from LXM file: "+str(e), RNS.LOG_ERROR)
def to_index_entry(self):
return {
"timestamp": self.timestamp,
"sort_timestamp": self.sort_timestamp,
"state": self._cached_state,
"title": self._cached_title,
"content": self._cached_content,
"source_hash": self._cached_source_hash,
"transport_encrypted": self._cached_transport_encrypted,
"transport_encryption": self._cached_transport_encryption,
"signature_validated": self._cached_signature_validated,
"unverified_reason": self._cached_unverified_reason,
"method": self._cached_method,
"has_attachments": self._cached_has_attachments,
"attachment_names": self._cached_attachment_names,
}
def restore_from_index(self, entry):
self.timestamp = entry.get("timestamp")
self.sort_timestamp = entry.get("sort_timestamp", self.sort_timestamp)
self._cached_state = entry.get("state")
self._cached_title = entry.get("title")
self._cached_content = entry.get("content")
self._cached_source_hash = entry.get("source_hash")
self._cached_transport_encrypted = entry.get("transport_encrypted")
self._cached_transport_encryption = entry.get("transport_encryption")
self._cached_signature_validated = entry.get("signature_validated")
self._cached_unverified_reason = entry.get("unverified_reason")
self._cached_method = entry.get("method")
self._cached_has_attachments = entry.get("has_attachments")
self._cached_attachment_names = entry.get("attachment_names")
@staticmethod
def read_index(conversation_path):
index_path = os.path.join(conversation_path, ".index")
if os.path.isfile(index_path):
try:
with open(index_path, "rb") as f:
return msgpack.unpackb(f.read(), raw=False)
except Exception:
pass
return {}
@staticmethod
def write_index(conversation_path, messages):
index_path = os.path.join(conversation_path, ".index")
index = {}
if os.path.isfile(index_path):
try:
with open(index_path, "rb") as f:
index = msgpack.unpackb(f.read(), raw=False)
except Exception:
index = {}
for msg in messages:
if msg._cached_state is not None:
filename = os.path.basename(msg.file_path)
index[filename] = msg.to_index_entry()
try:
with open(index_path, "wb") as f:
f.write(msgpack.packb(index))
except Exception as e:
RNS.log("Error writing conversation index: "+str(e), RNS.LOG_ERROR)