Merge pull request #88 from RFnexus/master

Conversations improvements and more from @RFnexus
This commit is contained in:
markqvist 2026-04-06 18:15:15 +02:00 committed by GitHub
commit ce8eec1d4b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 1268 additions and 89 deletions

View file

@ -2,6 +2,7 @@ import os
import RNS
import LXMF
import shutil
import msgpack
import nomadnet
from nomadnet.Directory import DirectoryEntry
@ -66,20 +67,29 @@ class Conversation:
ingested_path = lxmessage.write_to_directory(conversation_path)
try:
ConversationMessage.extract_attachments_from_lxm(lxmessage, app)
except Exception as e:
RNS.log("Error extracting attachments: "+str(e), RNS.LOG_ERROR)
if RNS.hexrep(source_hash, delimit=False) in Conversation.cached_conversations:
conversation = Conversation.cached_conversations[RNS.hexrep(source_hash, delimit=False)]
conversation.scan_storage()
if not source_hash in Conversation.unread_conversations:
Conversation.unread_conversations[source_hash] = True
try:
dirname = RNS.hexrep(source_hash, delimit=False)
open(app.conversationpath + "/" + dirname + "/unread", 'a').close()
except Exception as e:
pass
if source_hash in Conversation.unread_conversations:
Conversation.unread_conversations[source_hash] += 1
else:
Conversation.unread_conversations[source_hash] = 1
if Conversation.created_callback != None:
Conversation.created_callback()
try:
dirname = RNS.hexrep(source_hash, delimit=False)
with open(app.conversationpath + "/" + dirname + "/unread", "w") as uf:
uf.write(str(Conversation.unread_conversations[source_hash]))
except Exception as e:
pass
if Conversation.created_callback != None:
Conversation.created_callback()
return ingested_path
@ -94,12 +104,17 @@ class Conversation:
app_data = RNS.Identity.recall_app_data(source_hash)
display_name = app.directory.display_name(source_hash)
unread = False
unread = 0
if source_hash in Conversation.unread_conversations:
unread = True
unread = Conversation.unread_conversations[source_hash]
elif os.path.isfile(app.conversationpath + "/" + dirname + "/unread"):
Conversation.unread_conversations[source_hash] = True
unread = True
try:
with open(app.conversationpath + "/" + dirname + "/unread", "r") as uf:
content = uf.read().strip()
unread = int(content) if content else 1
except Exception:
unread = 1
Conversation.unread_conversations[source_hash] = unread
if display_name == None and app_data:
display_name = LXMF.display_name_from_app_data(app_data)
@ -110,14 +125,20 @@ class Conversation:
sort_name = display_name
trust_level = app.directory.trust_level(source_hash, display_name)
entry = (source_hash_text, display_name, trust_level, sort_name, unread)
conversation_dir = app.conversationpath + "/" + dirname
try:
last_activity = os.path.getmtime(conversation_dir)
except Exception:
last_activity = 0
entry = (source_hash_text, display_name, trust_level, sort_name, unread, last_activity)
conversations.append(entry)
except Exception as e:
RNS.log("Error while loading conversation "+str(dirname)+", skipping it. The contained exception was: "+str(e), RNS.LOG_ERROR)
conversations.sort(key=lambda e: (-e[2], e[3], e[0]), reverse=False)
conversations.sort(key=lambda e: e[5], reverse=True)
return conversations
@ -172,14 +193,46 @@ class Conversation:
def scan_storage(self):
old_len = len(self.messages)
existing = {}
for msg in self.messages:
existing[msg.file_path] = msg
index = ConversationMessage.read_index(self.messages_path)
self.messages = []
for filename in os.listdir(self.messages_path):
if len(filename) == RNS.Identity.HASHLENGTH//8*2:
message_path = self.messages_path + "/" + filename
self.messages.append(ConversationMessage(message_path))
if message_path in existing:
old_msg = existing[message_path]
try:
current_mtime = os.path.getmtime(message_path)
if current_mtime > old_msg.sort_timestamp:
old_msg._cached_state = None
old_msg._cached_method = None
old_msg.sort_timestamp = current_mtime
except Exception:
pass
self.messages.append(old_msg)
else:
msg = ConversationMessage(message_path)
if filename in index:
msg.restore_from_index(index[filename])
self.messages.append(msg)
new_len = len(self.messages)
needs_index_update = []
for msg in self.messages:
filename = os.path.basename(msg.file_path)
if msg._cached_state is not None:
if filename not in index:
needs_index_update.append(msg)
elif "content" not in index[filename]:
needs_index_update.append(msg)
if needs_index_update:
ConversationMessage.write_index(self.messages_path, needs_index_update)
if new_len > old_len:
self.unread = True
@ -208,7 +261,7 @@ class Conversation:
def register_changed_callback(self, callback):
self.__changed_callback = callback
def send(self, content="", title=""):
def send(self, content="", title="", fields=None):
if self.send_destination:
dest = self.send_destination
source = self.app.lxmf_destination
@ -225,7 +278,7 @@ class Conversation:
if self.app.directory.trust_level(dest.hash) == DirectoryEntry.TRUSTED:
dest_is_trusted = True
lxm = LXMF.LXMessage(dest, source, content, title=title, desired_method=desired_method, include_ticket=dest_is_trusted)
lxm = LXMF.LXMessage(dest, source, content, title=title, fields=fields, desired_method=desired_method, include_ticket=dest_is_trusted)
lxm.register_delivery_callback(self.message_notification)
lxm.register_failed_callback(self.message_notification)
@ -330,9 +383,32 @@ class ConversationMessage:
self.timestamp = None
self.lxm = None
self._cached_hash = None
self._cached_state = None
self._cached_title = None
self._cached_content = None
self._cached_source_hash = None
self._cached_transport_encrypted = None
self._cached_transport_encryption = None
self._cached_signature_validated = None
self._cached_unverified_reason = None
self._cached_method = None
self._cached_has_attachments = None
self._cached_attachment_names = None
self.sort_timestamp = os.path.getmtime(file_path) if os.path.isfile(file_path) else 0
filename = os.path.basename(file_path)
if len(filename) == RNS.Identity.HASHLENGTH//8*2:
try:
self._cached_hash = bytes.fromhex(filename)
except Exception:
pass
def load(self):
try:
self.lxm = LXMF.LXMessage.unpack_from_file(open(self.file_path, "rb"))
with open(self.file_path, "rb") as lxm_file:
self.lxm = LXMF.LXMessage.unpack_from_file(lxm_file)
self.loaded = True
self.timestamp = self.lxm.timestamp
self.sort_timestamp = os.path.getmtime(self.file_path)
@ -351,6 +427,78 @@ class ConversationMessage:
if not found:
self.lxm.state = LXMF.LXMessage.FAILED
if self._cached_hash is None:
self._cached_hash = self.lxm.hash
self._cached_state = self.lxm.state
if self._cached_source_hash is None:
self._cached_source_hash = self.lxm.source_hash
self._cached_transport_encrypted = self.lxm.transport_encrypted
self._cached_transport_encryption = self.lxm.transport_encryption
if self._cached_signature_validated is None:
self._cached_signature_validated = self.lxm.signature_validated
self._cached_method = self.lxm.method
if self._cached_unverified_reason is None and hasattr(self.lxm, "unverified_reason"):
self._cached_unverified_reason = self.lxm.unverified_reason
self._cached_title = self.lxm.title_as_string()
self._cached_content = self.lxm.content_as_string()
if self._cached_has_attachments is None:
found_in_fields = False
fields = None
if hasattr(self.lxm, "get_fields"):
fields = self.lxm.get_fields()
if fields and isinstance(fields, dict):
found_in_fields = (
LXMF.FIELD_FILE_ATTACHMENTS in fields
or LXMF.FIELD_IMAGE in fields
or LXMF.FIELD_AUDIO in fields
)
if found_in_fields:
names = []
file_atts = fields.get(LXMF.FIELD_FILE_ATTACHMENTS, [])
for att in file_atts:
if isinstance(att, list) and len(att) >= 2:
size = len(att[1]) if isinstance(att[1], bytes) else 0
names.append(("file", str(att[0]), size))
if LXMF.FIELD_IMAGE in fields:
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_IMAGE])
if data:
size = len(data)
ext = ConversationMessage._ext_from_media_format(fmt, data)
names.append(("file", "image"+ext, size))
if LXMF.FIELD_AUDIO in fields:
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_AUDIO])
if data:
size = len(data)
ext = ConversationMessage._ext_from_media_format(fmt, data, is_audio=True)
names.append(("file", "audio"+ext, size))
self._cached_has_attachments = True
self._cached_attachment_names = names
if not found_in_fields:
att_dir = self._attachment_dir()
if att_dir and os.path.isdir(att_dir):
manifest = self._read_attachment_manifest(att_dir)
if manifest and "files" in manifest and len(manifest["files"]) > 0:
names = []
for entry in manifest["files"]:
names.append(("file", entry["name"], entry.get("size", 0)))
self._cached_has_attachments = True
self._cached_attachment_names = names
else:
self._cached_has_attachments = False
self._cached_attachment_names = []
else:
self._cached_has_attachments = False
self._cached_attachment_names = []
try:
app = nomadnet.NomadNetworkApp.get_shared_instance()
ConversationMessage.extract_attachments_from_lxm(self.lxm, app)
ConversationMessage.strip_attachments_from_file(self.file_path, app)
except Exception:
pass
except Exception as e:
RNS.log("Error while loading LXMF message "+str(self.file_path)+" from disk. The contained exception was: "+str(e), RNS.LOG_ERROR)
@ -364,60 +512,395 @@ class ConversationMessage:
os.unlink(self.file_path)
def get_timestamp(self):
if self.timestamp is not None:
return self.timestamp
if not self.loaded:
self.load()
return self.timestamp
def get_title(self):
if self._cached_title is not None:
return self._cached_title
if not self.loaded:
self.load()
return self.lxm.title_as_string()
return self._cached_title if self._cached_title is not None else ""
def get_content(self):
if self._cached_content is not None:
return self._cached_content
if not self.loaded:
self.load()
return self.lxm.content_as_string()
return self._cached_content if self._cached_content is not None else ""
def get_hash(self):
if self._cached_hash is not None:
return self._cached_hash
if not self.loaded:
self.load()
return self.lxm.hash
return self._cached_hash
def get_state(self):
if self._cached_state is not None:
return self._cached_state
if not self.loaded:
self.load()
return self.lxm.state
return self._cached_state
def get_transport_encryption(self):
if self._cached_transport_encryption is not None:
return self._cached_transport_encryption
if not self.loaded:
self.load()
return self.lxm.transport_encryption
return self._cached_transport_encryption
def get_transport_encrypted(self):
if self._cached_transport_encrypted is not None:
return self._cached_transport_encrypted
if not self.loaded:
self.load()
return self.lxm.transport_encrypted
return self._cached_transport_encrypted
def signature_validated(self):
if self._cached_signature_validated is not None:
return self._cached_signature_validated
if not self.loaded:
self.load()
return self.lxm.signature_validated
return self._cached_signature_validated
def get_signature_description(self):
if self.signature_validated():
return "Signature Verified"
else:
if self.lxm.unverified_reason == LXMF.LXMessage.SOURCE_UNKNOWN:
reason = self._cached_unverified_reason
if reason == LXMF.LXMessage.SOURCE_UNKNOWN:
return "Unknown Origin"
elif self.lxm.unverified_reason == LXMF.LXMessage.SIGNATURE_INVALID:
elif reason == LXMF.LXMessage.SIGNATURE_INVALID:
return "Invalid Signature"
else:
return "Unknown signature validation failure"
return "Unknown signature validation failure"
def get_fields(self):
if not self.loaded:
self.load()
if self.lxm and hasattr(self.lxm, "get_fields"):
fields = self.lxm.get_fields()
if fields and isinstance(fields, dict):
return fields
return {}
def has_attachments(self):
if self._cached_has_attachments is not None:
return self._cached_has_attachments
fields = self.get_fields()
return (
LXMF.FIELD_FILE_ATTACHMENTS in fields
or LXMF.FIELD_IMAGE in fields
or LXMF.FIELD_AUDIO in fields
)
def get_file_attachments(self):
att_dir = self._attachment_dir()
if att_dir and os.path.isdir(att_dir):
manifest = self._read_attachment_manifest(att_dir)
if manifest and "files" in manifest:
result = []
for entry in manifest["files"]:
fpath = os.path.join(att_dir, entry["stored_name"])
if os.path.isfile(fpath):
with open(fpath, "rb") as f:
result.append([entry["name"], f.read()])
return result
fields = self.get_fields()
return fields.get(LXMF.FIELD_FILE_ATTACHMENTS, [])
def get_image(self):
att_dir = self._attachment_dir()
if att_dir and os.path.isdir(att_dir):
fpath = os.path.join(att_dir, "image")
if os.path.isfile(fpath):
with open(fpath, "rb") as f:
return f.read()
fields = self.get_fields()
return fields.get(LXMF.FIELD_IMAGE, None)
def get_audio(self):
att_dir = self._attachment_dir()
if att_dir and os.path.isdir(att_dir):
fpath = os.path.join(att_dir, "audio")
if os.path.isfile(fpath):
with open(fpath, "rb") as f:
return f.read()
fields = self.get_fields()
return fields.get(LXMF.FIELD_AUDIO, None)
def get_attachment_file_path(self, field_type, field_index=0):
att_dir = self._attachment_dir()
if att_dir and os.path.isdir(att_dir):
manifest = self._read_attachment_manifest(att_dir)
if manifest and "files" in manifest and field_index < len(manifest["files"]):
return os.path.join(att_dir, manifest["files"][field_index]["stored_name"])
# Fallback for old extraction format
if field_type == "image":
fpath = os.path.join(att_dir, "image")
if os.path.isfile(fpath):
return fpath
elif field_type == "audio":
fpath = os.path.join(att_dir, "audio")
if os.path.isfile(fpath):
return fpath
return None
def _attachment_dir(self):
try:
app = nomadnet.NomadNetworkApp.get_shared_instance()
msg_hash = self.get_hash()
if msg_hash:
return os.path.join(app.attachmentpath, RNS.hexrep(msg_hash, delimit=False))
except Exception:
pass
return None
def _read_attachment_manifest(self, att_dir):
manifest_path = os.path.join(att_dir, "manifest")
if os.path.isfile(manifest_path):
try:
with open(manifest_path, "rb") as f:
return msgpack.unpackb(f.read(), raw=False)
except Exception:
pass
return None
@staticmethod
def extract_attachments_from_lxm(lxmessage, app):
if not hasattr(lxmessage, "get_fields"):
return
fields = lxmessage.get_fields()
if not fields or not isinstance(fields, dict):
return
has_any = (
LXMF.FIELD_FILE_ATTACHMENTS in fields
or LXMF.FIELD_IMAGE in fields
or LXMF.FIELD_AUDIO in fields
)
if not has_any:
return
msg_hash_hex = RNS.hexrep(lxmessage.hash, delimit=False)
att_dir = os.path.join(app.attachmentpath, msg_hash_hex)
if os.path.isdir(att_dir):
return
os.makedirs(att_dir)
manifest = {"files": []}
file_attachments = fields.get(LXMF.FIELD_FILE_ATTACHMENTS, [])
if file_attachments:
for idx, att in enumerate(file_attachments):
if isinstance(att, list) and len(att) >= 2:
filename = str(att[0])
data = att[1] if isinstance(att[1], bytes) else b""
stored_name = "file_"+str(idx)
with open(os.path.join(att_dir, stored_name), "wb") as f:
f.write(data)
manifest["files"].append({"name": filename, "stored_name": stored_name, "size": len(data)})
if LXMF.FIELD_IMAGE in fields:
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_IMAGE])
if data:
ext = ConversationMessage._ext_from_media_format(fmt, data)
filename = "image" + ext
stored_name = "file_"+str(len(manifest["files"]))
with open(os.path.join(att_dir, stored_name), "wb") as f:
f.write(data)
manifest["files"].append({"name": filename, "stored_name": stored_name, "size": len(data)})
if LXMF.FIELD_AUDIO in fields:
fmt, data = ConversationMessage._unpack_media_field(fields[LXMF.FIELD_AUDIO])
if data:
ext = ConversationMessage._ext_from_media_format(fmt, data, is_audio=True)
filename = "audio" + ext
stored_name = "file_"+str(len(manifest["files"]))
with open(os.path.join(att_dir, stored_name), "wb") as f:
f.write(data)
manifest["files"].append({"name": filename, "stored_name": stored_name, "size": len(data)})
with open(os.path.join(att_dir, "manifest"), "wb") as f:
f.write(msgpack.packb(manifest))
@staticmethod
def _unpack_media_field(field_data):
"""Normalize FIELD_IMAGE/FIELD_AUDIO which can be raw bytes or [format, bytes].
Format element can be a string ('webp') or integer (LXMF audio mode constant)."""
if isinstance(field_data, bytes):
return None, field_data
elif isinstance(field_data, list) and len(field_data) >= 2 and isinstance(field_data[1], bytes):
return field_data[0], field_data[1]
return None, None
@staticmethod
def _detect_image_ext(data):
if not isinstance(data, bytes) or len(data) < 12:
return ".bin"
if data[:8] == b'\x89PNG\r\n\x1a\n':
return ".png"
elif data[:3] == b'\xff\xd8\xff':
return ".jpg"
elif data[:4] == b'GIF8':
return ".gif"
elif data[:4] == b'RIFF' and data[8:12] == b'WEBP':
return ".webp"
elif data[:4] == b'\x00\x00\x00\x1c' or data[:4] == b'\x00\x00\x00\x18':
return ".heic"
return ".bin"
@staticmethod
def _detect_audio_ext(data):
if not isinstance(data, bytes) or len(data) < 12:
return ".bin"
if data[:4] == b'OggS':
return ".ogg"
elif data[:2] == b'\xff\xfb' or data[:3] == b'ID3':
return ".mp3"
elif data[:4] == b'RIFF' and data[8:12] == b'WAVE':
return ".wav"
elif data[:4] == b'fLaC':
return ".flac"
return ".bin"
@staticmethod
def _ext_from_media_format(fmt, data, is_audio=False):
"""Derive file extension from format identifier and data.
fmt can be a string ('webp'), an integer (LXMF audio mode), or None."""
if isinstance(fmt, str) and len(fmt) > 0:
return "." + fmt.lower().strip(".")
if isinstance(fmt, int) and is_audio:
if fmt >= 16 and fmt <= 25:
return ".ogg"
elif fmt >= 1 and fmt <= 9:
return ".c2"
if is_audio:
return ConversationMessage._detect_audio_ext(data)
return ConversationMessage._detect_image_ext(data)
@staticmethod
def strip_attachments_from_file(file_path, app):
try:
with open(file_path, "rb") as f:
container = msgpack.unpackb(f.read(), strict_map_key=False)
lxmf_bytes = container[b"lxmf_bytes"] if b"lxmf_bytes" in container else container.get("lxmf_bytes")
if lxmf_bytes is None:
return
dest_len = LXMF.LXMessage.DESTINATION_LENGTH
sig_len = LXMF.LXMessage.SIGNATURE_LENGTH
header_len = 2 * dest_len + sig_len
header = lxmf_bytes[:header_len]
payload_bytes = lxmf_bytes[header_len:]
payload = msgpack.unpackb(payload_bytes, strict_map_key=False)
if len(payload) < 4 or not isinstance(payload[3], dict):
return
fields = payload[3]
attachment_keys = [LXMF.FIELD_FILE_ATTACHMENTS, LXMF.FIELD_IMAGE, LXMF.FIELD_AUDIO]
if not any(k in fields for k in attachment_keys):
return
# Compute message hash matching LXMF's logic
if len(payload) > 4:
hash_payload = msgpack.packb(payload[:4])
else:
hash_payload = payload_bytes
msg_hash = RNS.Identity.full_hash(lxmf_bytes[:2*dest_len] + hash_payload)
msg_hash_hex = RNS.hexrep(msg_hash, delimit=False)
att_dir = os.path.join(app.attachmentpath, msg_hash_hex)
if not os.path.isdir(att_dir):
return
for k in attachment_keys:
if k in fields:
del fields[k]
new_lxmf_bytes = header + msgpack.packb(payload)
key = b"lxmf_bytes" if b"lxmf_bytes" in container else "lxmf_bytes"
container[key] = new_lxmf_bytes
with open(file_path, "wb") as f:
f.write(msgpack.packb(container))
except Exception as e:
RNS.log("Error stripping attachments from LXM file: "+str(e), RNS.LOG_ERROR)
def to_index_entry(self):
return {
"timestamp": self.timestamp,
"sort_timestamp": self.sort_timestamp,
"state": self._cached_state,
"title": self._cached_title,
"content": self._cached_content,
"source_hash": self._cached_source_hash,
"transport_encrypted": self._cached_transport_encrypted,
"transport_encryption": self._cached_transport_encryption,
"signature_validated": self._cached_signature_validated,
"unverified_reason": self._cached_unverified_reason,
"method": self._cached_method,
"has_attachments": self._cached_has_attachments,
"attachment_names": self._cached_attachment_names,
}
def restore_from_index(self, entry):
self.timestamp = entry.get("timestamp")
self.sort_timestamp = entry.get("sort_timestamp", self.sort_timestamp)
self._cached_state = entry.get("state")
self._cached_title = entry.get("title")
self._cached_content = entry.get("content")
self._cached_source_hash = entry.get("source_hash")
self._cached_transport_encrypted = entry.get("transport_encrypted")
self._cached_transport_encryption = entry.get("transport_encryption")
self._cached_signature_validated = entry.get("signature_validated")
self._cached_unverified_reason = entry.get("unverified_reason")
self._cached_method = entry.get("method")
self._cached_has_attachments = entry.get("has_attachments")
self._cached_attachment_names = entry.get("attachment_names")
@staticmethod
def read_index(conversation_path):
index_path = os.path.join(conversation_path, ".index")
if os.path.isfile(index_path):
try:
with open(index_path, "rb") as f:
return msgpack.unpackb(f.read(), raw=False)
except Exception:
pass
return {}
@staticmethod
def write_index(conversation_path, messages):
index_path = os.path.join(conversation_path, ".index")
index = {}
if os.path.isfile(index_path):
try:
with open(index_path, "rb") as f:
index = msgpack.unpackb(f.read(), raw=False)
except Exception:
index = {}
for msg in messages:
if msg._cached_state is not None:
filename = os.path.basename(msg.file_path)
index[filename] = msg.to_index_entry()
try:
with open(index_path, "wb") as f:
f.write(msgpack.packb(index))
except Exception as e:
RNS.log("Error writing conversation index: "+str(e), RNS.LOG_ERROR)

View file

@ -107,6 +107,7 @@ class NomadNetworkApp:
self.directorypath = self.configdir+"/storage/directory"
self.peersettingspath = self.configdir+"/storage/peersettings"
self.tmpfilespath = self.configdir+"/storage/tmp"
self.attachmentpath = self.configdir+"/storage/attachments"
self.pagespath = self.configdir+"/storage/pages"
self.filespath = self.configdir+"/storage/files"
@ -114,6 +115,7 @@ class NomadNetworkApp:
self.examplespath = self.configdir+"/examples"
self.downloads_path = os.path.expanduser("~/Downloads")
self.attachment_save_path = None
self.firstrun = False
self.should_run_jobs = True
@ -165,6 +167,9 @@ class NomadNetworkApp:
if not os.path.isdir(self.tmpfilespath):
os.makedirs(self.tmpfilespath)
if not os.path.isdir(self.attachmentpath):
os.makedirs(self.attachmentpath)
else:
self.clear_tmp_dir()
@ -736,6 +741,10 @@ class NomadNetworkApp:
value = self.config["client"]["downloads_path"]
self.downloads_path = os.path.expanduser(value)
if option == "attachment_save_path":
value = self.config["client"]["attachment_save_path"]
self.attachment_save_path = os.path.expanduser(value)
if option == "announce_at_start":
value = self.config["client"].as_bool(option)
self.peer_announce_at_start = value
@ -1059,6 +1068,11 @@ destination = file
enable_client = yes
user_interface = text
downloads_path = ~/Downloads
# Where to save received attachments. If not set,
# attachments will be saved to the downloads path.
# attachment_save_path = ~/Downloads
notify_on_new_message = yes
# By default, the peer is announced at startup

View file

@ -146,6 +146,9 @@ GLYPHS = {
("qrcode", "QR", "\u25a4", "\uf029"),
("selected", "[*] ", "\u25CF", "\u25CF"),
("unselected", "[ ] ", "\u25CB", "\u25CB"),
("file", "[F]", "\u25a4", "\uf15b"),
("image", "[I]", "\u25a3", "\uf1c5"),
("audio", "[~]", "\u266b", "\uf1c7"),
}
class TextUI:

View file

@ -1,26 +1,64 @@
import RNS
import os
import shutil
import time
import nomadnet
import LXMF
import urwid
from datetime import datetime
from datetime import datetime, timedelta
from nomadnet.Directory import DirectoryEntry
from nomadnet.Conversation import ConversationMessage
def relative_time(timestamp):
now = time.time()
delta = now - timestamp
if delta < 0:
return "just now"
elif delta < 60:
return "just now"
elif delta < 3600:
m = int(delta / 60)
return str(m)+"m ago"
elif delta < 86400:
h = int(delta / 3600)
return str(h)+"h ago"
elif delta < 172800:
return "yesterday"
elif delta < 604800:
d = int(delta / 86400)
return str(d)+"d ago"
elif delta < 2592000:
w = int(delta / 604800)
return str(w)+"w ago"
else:
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d")
def _format_size(size):
if size < 1024:
return str(size)+" B"
elif size < 1048576:
return str(round(size/1024, 1))+" KB"
else:
return str(round(size/1048576, 1))+" MB"
from nomadnet.vendor.additional_urwid_widgets import IndicativeListBox
class ConversationListDisplayShortcuts():
def __init__(self, app):
self.app = app
self.widget = urwid.AttrMap(urwid.Text("[C-e] Peer Info [C-x] Delete [C-r] Sync [C-n] New [C-u] Ingest URI [C-g] Fullscreen"), "shortcutbar")
self.widget = urwid.AttrMap(urwid.Text("[C-e] Peer Info [C-x] Delete [C-r] Sync [C-n] New [C-u] Ingest URI [C-o] Sort [C-g] Fullscreen"), "shortcutbar")
class ConversationDisplayShortcuts():
def __init__(self, app):
self.app = app
self.widget = urwid.AttrMap(urwid.Text("[C-d] Send [C-p] Paper Msg [C-t] Title [C-k] Clear [C-w] Close [C-u] Purge [C-x] Clear History [C-o] Sort"), "shortcutbar")
self.widget = urwid.AttrMap(urwid.Text("[C-d] Send [C-p] Paper Msg [C-t] Title [C-a] Attach [C-s] Save [C-k] Clear [C-w] Close [C-u] Purge [C-x] Clear History [C-o] Sort"), "shortcutbar")
class ConversationsArea(urwid.LineBox):
def keypress(self, size, key):
@ -36,6 +74,8 @@ class ConversationsArea(urwid.LineBox):
self.delegate.sync_conversations()
elif key == "ctrl g":
self.delegate.toggle_fullscreen()
elif key == "ctrl o":
self.delegate.toggle_list_sort()
elif key == "tab":
self.delegate.app.ui.main_display.frame.focus_position = "header"
elif key == "up" and (self.delegate.ilb.first_item_is_selected() or self.delegate.ilb.body_is_empty()):
@ -46,7 +86,11 @@ class ConversationsArea(urwid.LineBox):
class DialogLineBox(urwid.LineBox):
def keypress(self, size, key):
if key == "esc":
self.delegate.update_conversation_list()
if hasattr(self.delegate, "update_conversation_list"):
self.delegate.update_conversation_list()
elif hasattr(self.delegate, "dialog_active"):
self.delegate.dialog_active = False
self.delegate.conversation_changed(None)
else:
return super(DialogLineBox, self).keypress(size, key)
@ -55,11 +99,15 @@ class ConversationsDisplay():
given_list_width = 52
cached_conversation_widgets = {}
SORT_RECENT = 0
SORT_NAME = 1
def __init__(self, app):
self.app = app
self.dialog_open = False
self.sync_dialog = None
self.currently_displayed_conversation = None
self.list_sort_mode = ConversationsDisplay.SORT_RECENT
def disp_list_shortcuts(sender, arg1, arg2):
self.shortcuts_display = self.list_shortcuts
@ -85,16 +133,23 @@ class ConversationsDisplay():
nomadnet.Conversation.created_callback = self.update_conversation_list
def focus_change_event(self):
# This hack corrects buggy styling behaviour in IndicativeListBox
if not self.dialog_open:
ilb_position = self.ilb.get_selected_position()
self.update_conversation_list()
if ilb_position != None:
self.ilb.select_item(ilb_position)
def toggle_list_sort(self):
if self.list_sort_mode == ConversationsDisplay.SORT_RECENT:
self.list_sort_mode = ConversationsDisplay.SORT_NAME
else:
self.list_sort_mode = ConversationsDisplay.SORT_RECENT
self.update_conversation_list()
def update_listbox(self):
conversations = self.app.conversations()
if self.list_sort_mode == ConversationsDisplay.SORT_NAME:
conversations.sort(key=lambda e: (e[3].lower(), e[0]))
conversation_list_widgets = []
for conversation in self.app.conversations():
for conversation in conversations:
conversation_list_widgets.append(self.conversation_list_widget(conversation))
self.list_widgets = conversation_list_widgets
@ -706,9 +761,13 @@ class ConversationsDisplay():
pass
def update_conversation_list(self):
ilb_position = self.ilb.get_selected_position()
selected_hash = None
selected_item = self.ilb.get_selected_item()
if selected_item is not None:
if hasattr(selected_item, "source_hash"):
selected_hash = selected_item.source_hash
self.update_listbox()
# options = self.columns_widget.options(urwid.WEIGHT, ConversationsDisplay.list_width)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
if not (self.dialog_open and self.sync_dialog != None):
self.columns_widget.contents[0] = (self.listbox, options)
@ -726,8 +785,11 @@ class ConversationsDisplay():
)
self.columns_widget.contents[0] = (overlay, options)
if ilb_position != None:
self.ilb.select_item(ilb_position)
if selected_hash is not None:
for idx, widget in enumerate(self.list_widgets):
if widget.source_hash == selected_hash:
self.ilb.select_item(idx)
break
nomadnet.NomadNetworkApp.get_shared_instance().ui.loop.draw_screen()
if self.app.ui.main_display.sub_displays.active_display == self.app.ui.main_display.sub_displays.conversations_display:
@ -803,10 +865,11 @@ class ConversationsDisplay():
def conversation_list_widget(self, conversation):
trust_level = conversation[2]
display_name = conversation[1]
source_hash = conversation[0]
unread = conversation[4]
trust_level = conversation[2]
display_name = conversation[1]
source_hash = conversation[0]
unread = conversation[4]
last_activity = conversation[5]
g = self.app.ui.glyphs
@ -842,9 +905,15 @@ class ConversationsDisplay():
if trust_level != DirectoryEntry.UNTRUSTED:
if unread:
if source_hash != self.currently_displayed_conversation:
display_text += " "+g["unread"]
if unread > 1:
display_text += " "+g["unread"]+" ("+str(unread)+")"
else:
display_text += " "+g["unread"]
if last_activity > 0:
display_text += "\n "+relative_time(last_activity)
widget = ListEntry(display_text)
urwid.connect_signal(widget, "click", self.display_conversation, conversation[0])
display_widget = urwid.AttrMap(widget, style, focus_style)
@ -893,6 +962,10 @@ class MessageEdit(urwid.Edit):
self.delegate.send_message()
elif key == "ctrl p":
self.delegate.paper_message()
elif key == "ctrl a":
self.delegate.attach_file()
elif key == "ctrl s":
self.delegate.save_focused_attachments()
elif key == "ctrl k":
self.delegate.clear_editor()
elif key == "up":
@ -913,7 +986,9 @@ class MessageEdit(urwid.Edit):
class ConversationFrame(urwid.Frame):
def keypress(self, size, key):
if self.focus_position == "body":
if key == "up" and self.delegate.messagelist.top_is_visible:
if getattr(self.delegate, "dialog_active", False) or getattr(self.delegate, "dialog_open", False):
return super(ConversationFrame, self).keypress(size, key)
elif key == "up" and self.delegate.messagelist.top_is_visible:
nomadnet.NomadNetworkApp.get_shared_instance().ui.main_display.frame.focus_position = "header"
elif key == "down" and self.delegate.messagelist.bottom_is_visible:
self.focus_position = "footer"
@ -941,6 +1016,8 @@ class ConversationWidget(urwid.WidgetWrap):
self.message_widgets = []
self.sort_by_timestamp = False
self.updating_message_widgets = False
self.pending_attachments = []
self.dialog_active = False
self.update_message_widgets()
@ -956,13 +1033,17 @@ class ConversationWidget(urwid.WidgetWrap):
msg_editor.delegate = self
msg_editor.name = "content_editor"
header = None
self.peer_info_widget = urwid.AttrMap(urwid.Text(""), "msg_header_sent")
self._update_peer_info()
header_widgets = [self.peer_info_widget]
if self.conversation.trust_level == DirectoryEntry.UNTRUSTED:
header = urwid.AttrMap(
header_widgets.append(urwid.AttrMap(
urwid.Padding(
urwid.Text(g["warning"]+" Warning: Conversation with untrusted peer "+g["warning"], align=urwid.CENTER)),
"msg_warning_untrusted",
)
))
header = urwid.Pile(header_widgets)
self.minimal_editor = urwid.AttrMap(msg_editor, "msg_editor")
self.minimal_editor.name = "minimal_editor"
@ -1001,6 +1082,40 @@ class ConversationWidget(urwid.WidgetWrap):
super().__init__(self.display_widget)
def _update_peer_info(self):
g = self.app.ui.glyphs
source_hash_bytes = bytes.fromhex(self.source_hash)
display_name = self.app.directory.display_name(source_hash_bytes)
app_data = None
if display_name is None or self.app.message_router.get_outbound_stamp_cost(source_hash_bytes) is None:
app_data = RNS.Identity.recall_app_data(source_hash_bytes)
if display_name is None:
if app_data:
display_name = LXMF.display_name_from_app_data(app_data)
if display_name is None:
display_name = RNS.prettyhexrep(source_hash_bytes)
stamp_cost = self.app.message_router.get_outbound_stamp_cost(source_hash_bytes)
if stamp_cost is None and app_data:
stamp_cost = LXMF.stamp_cost_from_app_data(app_data)
hops = RNS.Transport.hops_to(source_hash_bytes)
if hops >= RNS.Transport.PATHFINDER_M:
hops_str = "unknown"
else:
hops_str = str(hops)+" hop" + ("s" if hops != 1 else "")
right_parts = []
if stamp_cost is not None:
right_parts.append("Stamp: "+str(stamp_cost))
right_parts.append(g["speed"]+hops_str)
left = " "+display_name
right = " ".join(right_parts)+" "
self.peer_info_widget.original_widget.set_text(left+" | "+right)
def clear_history_dialog(self):
def dismiss_dialog(sender):
self.dialog_open = False
@ -1039,20 +1154,38 @@ class ConversationWidget(urwid.WidgetWrap):
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def _build_footer(self):
g = self.app.ui.glyphs
if self.full_editor_active:
editor = self.full_editor
else:
editor = self.minimal_editor
if self.pending_attachments:
attachment_texts = []
for path in self.pending_attachments:
attachment_texts.append(os.path.basename(path))
indicator = urwid.AttrMap(
urwid.Text(g["file"]+" "+str(len(self.pending_attachments))+" file(s): "+", ".join(attachment_texts)),
"msg_header_sent",
)
return urwid.Pile([indicator, editor])
else:
return editor
def toggle_editor(self):
if self.full_editor_active:
self.frame.contents["footer"] = (self.minimal_editor, None)
self.full_editor_active = False
else:
self.frame.contents["footer"] = (self.full_editor, None)
self.full_editor_active = True
self.frame.contents["footer"] = (self._build_footer(), None)
def check_editor_allowed(self):
g = self.app.ui.glyphs
if self.frame:
allowed = nomadnet.NomadNetworkApp.get_shared_instance().directory.is_known(bytes.fromhex(self.source_hash))
if allowed:
self.frame.contents["footer"] = (self.minimal_editor, None)
self.frame.contents["footer"] = (self._build_footer(), None)
else:
warning = urwid.AttrMap(
urwid.Padding(urwid.Text(
@ -1096,10 +1229,16 @@ class ConversationWidget(urwid.WidgetWrap):
elif key == "ctrl o":
self.sort_by_timestamp ^= True
self.conversation_changed(None)
elif key == "ctrl a":
self.attach_file()
elif key == "ctrl s":
self.save_focused_attachments()
else:
return super(ConversationWidget, self).keypress(size, key)
def conversation_changed(self, conversation):
if hasattr(self, "peer_info_widget"):
self._update_peer_info()
self.update_message_widgets(replace = True)
def update_message_widgets(self, replace = False):
@ -1109,13 +1248,25 @@ class ConversationWidget(urwid.WidgetWrap):
self.updating_message_widgets = True
self.message_widgets = []
added_hashes = []
needs_index = []
for message in self.conversation.messages:
message_hash = message.get_hash()
if not message_hash in added_hashes:
added_hashes.append(message_hash)
was_loaded = message.loaded
message_widget = LXMessageWidget(message)
self.message_widgets.append(message_widget)
if not was_loaded and message.loaded:
needs_index.append(message)
message.unload()
if needs_index:
try:
ConversationMessage.write_index(
self.conversation.messages_path, needs_index)
except Exception:
pass
if self.sort_by_timestamp:
self.message_widgets.sort(key=lambda m: m.timestamp, reverse=False)
else:
@ -1134,15 +1285,155 @@ class ConversationWidget(urwid.WidgetWrap):
def clear_editor(self):
self.content_editor.set_edit_text("")
self.title_editor.set_edit_text("")
self.pending_attachments = []
self.frame.contents["footer"] = (self._build_footer(), None)
def _collect_attachment_refs(self):
g = self.app.ui.glyphs
refs = []
sorted_messages = sorted(self.conversation.messages, key=lambda m: m.sort_timestamp, reverse=True)
for conv_message in sorted_messages:
if not conv_message.has_attachments():
continue
cached_names = conv_message._cached_attachment_names or []
att_file_idx = 0
for atype, aname, *arest in cached_names:
asize = arest[0] if arest else 0
glyph = g["file"] if atype == "file" else g[atype]
label = glyph+" "+aname
if asize > 0:
label += " ("+_format_size(asize)+")"
if atype == "file":
refs.append((label, aname, conv_message, "file", att_file_idx))
att_file_idx += 1
else:
refs.append((label, aname, conv_message, atype, 0))
return refs
def save_focused_attachments(self):
g = self.app.ui.glyphs
self.dialog_active = True
try:
attachment_items = self._collect_attachment_refs()
except Exception as e:
RNS.log("Error collecting attachments: "+str(e), RNS.LOG_ERROR)
attachment_items = []
save_dir = self.app.attachment_save_path if self.app.attachment_save_path else self.app.downloads_path
def dismiss_dialog(sender):
self.dialog_active = False
self.conversation_changed(None)
if not attachment_items:
dialog = DialogLineBox(
urwid.Pile([
urwid.Text("No attachments in this conversation.\n"),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
]), title="Attachments"
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=45, valign=urwid.MIDDLE, height=urwid.PACK, left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
return
checkboxes = []
for label, filename, conv_msg, field_type, field_index in attachment_items:
cb = urwid.CheckBox(label, state=False)
cb._attachment_filename = filename
cb._conv_message = conv_msg
cb._field_type = field_type
cb._field_index = field_index
checkboxes.append(cb)
status_text = urwid.Text("")
def do_save(sender):
saved = []
errors = []
for cb in checkboxes:
if cb.get_state():
try:
src_path = cb._conv_message.get_attachment_file_path(cb._field_type, cb._field_index)
if src_path and os.path.isfile(src_path):
path = _copy_attachment_to_dest(cb._attachment_filename, src_path)
saved.append(path)
except Exception as e:
errors.append(str(e))
if saved:
lines = [g["check"]+" Copied "+str(len(saved))+" file(s) to "+save_dir+":"]
for p in saved:
lines.append(" "+os.path.basename(p))
if errors:
lines.append(g["cross"]+" "+str(len(errors))+" failed")
status_text.set_text("\n".join(lines))
elif errors:
status_text.set_text(g["cross"]+" Failed: "+errors[0])
else:
status_text.set_text("No files selected")
dialog_widgets = list(checkboxes)
dialog_widgets.append(urwid.Divider(g["divider1"]))
dialog_widgets.append(urwid.Text("Copy to: "+save_dir))
dialog_widgets.append(status_text)
dialog_widgets.append(urwid.Text(""))
dialog_widgets.append(urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Copy to Downloads", on_press=do_save)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Close", on_press=dismiss_dialog)),
]))
dialog = DialogLineBox(urwid.ListBox(urwid.SimpleFocusListWalker(dialog_widgets)), title="Attachments")
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=("relative", 80), valign=urwid.MIDDLE, height=("relative", 80), left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def send_message(self):
content = self.content_editor.get_edit_text()
title = self.title_editor.get_edit_text()
if not content == "":
if self.conversation.send(content, title):
fields = None
if self.pending_attachments:
file_attachments = []
for file_path in self.pending_attachments:
try:
with open(file_path, "rb") as af:
file_data = af.read()
file_name = os.path.basename(file_path)
file_attachments.append([file_name, file_data])
except Exception as e:
RNS.log("Error reading attachment "+str(file_path)+": "+str(e), RNS.LOG_ERROR)
if file_attachments:
fields = {LXMF.FIELD_FILE_ATTACHMENTS: file_attachments}
if self.conversation.send(content, title, fields=fields):
self.clear_editor()
else:
pass
def attach_file(self):
self.dialog_active = True
browser = FileBrowserDialog(self)
bottom = self.messagelist
overlay = urwid.Overlay(browser, bottom, align=urwid.CENTER, width=("relative", 90), valign=urwid.MIDDLE, height=("relative", 80), left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def file_browser_closed(self):
self.dialog_active = False
self.frame.contents["footer"] = (self._build_footer(), None)
self.conversation_changed(None)
def paper_message_saved(self, path):
g = self.app.ui.glyphs
@ -1275,55 +1566,443 @@ class LXMessageWidget(urwid.WidgetWrap):
g = app.ui.glyphs
self.timestamp = message.get_timestamp()
self.sort_timestamp = message.sort_timestamp
self.transfer_done = False
self._live_lxm = None
msg_hash = message.get_hash()
msg_state = message.get_state()
msg_source_hash = message._cached_source_hash
msg_method = message._cached_method
time_format = app.time_format
message_time = datetime.fromtimestamp(self.timestamp)
encryption_string = ""
if message.get_transport_encrypted():
encryption_string = " ["+g["encrypted"]+" "+str(message.get_transport_encryption())+"]"
encryption_string = " "+g["encrypted"]
else:
encryption_string = " ["+g["plaintext"]+" "+str(message.get_transport_encryption())+"]"
title_string = message_time.strftime(time_format)+encryption_string
encryption_string = " "+g["plaintext"]
if app.lxmf_destination.hash == message.lxm.source_hash:
if message.lxm.state == LXMF.LXMessage.DELIVERED:
title_string = relative_time(self.timestamp)+" | "+message_time.strftime(time_format)+encryption_string
is_outbound = False
if msg_source_hash is None:
header_style = "msg_header_failed"
title_string = g["warning"]+" "+title_string
elif app.lxmf_destination.hash == msg_source_hash:
is_outbound = True
if msg_state == LXMF.LXMessage.DELIVERED:
header_style = "msg_header_delivered"
title_string = g["check"]+" "+title_string
elif message.lxm.state == LXMF.LXMessage.FAILED:
title_string = g["check"]+" "+g["arrow_r"]+" "+title_string
elif msg_state == LXMF.LXMessage.FAILED:
header_style = "msg_header_failed"
title_string = g["cross"]+" "+title_string
elif message.lxm.method == LXMF.LXMessage.PROPAGATED and message.lxm.state == LXMF.LXMessage.SENT:
title_string = g["cross"]+" "+g["arrow_r"]+" "+title_string
elif msg_state == LXMF.LXMessage.REJECTED:
header_style = "msg_header_failed"
title_string = g["cross"]+" "+g["arrow_r"]+" Rejected "+title_string
elif msg_method == LXMF.LXMessage.PROPAGATED and msg_state == LXMF.LXMessage.SENT:
header_style = "msg_header_propagated"
title_string = g["sent"]+" "+title_string
elif message.lxm.method == LXMF.LXMessage.PAPER and message.lxm.state == LXMF.LXMessage.PAPER:
title_string = g["sent"]+" "+g["arrow_r"]+" "+title_string
elif msg_method == LXMF.LXMessage.PAPER and msg_state == LXMF.LXMessage.PAPER:
header_style = "msg_header_propagated"
title_string = g["papermsg"]+" "+title_string
elif message.lxm.state == LXMF.LXMessage.SENT:
title_string = g["papermsg"]+" "+g["arrow_r"]+" "+title_string
elif msg_state == LXMF.LXMessage.SENT:
header_style = "msg_header_sent"
title_string = g["sent"]+" "+title_string
title_string = g["sent"]+" "+g["arrow_r"]+" "+title_string
else:
header_style = "msg_header_sent"
title_string = g["arrow_r"]+" "+title_string
else:
if message.signature_validated():
header_style = "msg_header_ok"
title_string = g["check"]+" "+title_string
title_string = g["check"]+" "+g["arrow_l"]+" "+title_string
else:
header_style = "msg_header_caution"
title_string = g["warning"]+" "+message.get_signature_description() + "\n " + title_string
title_string = g["warning"]+" "+g["arrow_l"]+" "+message.get_signature_description() + "\n " + title_string
if message.get_title() != "":
title_string += " | " + message.get_title()
has_attachments = message.has_attachments()
cached_names = message._cached_attachment_names or []
if has_attachments and cached_names:
attachment_strings = []
for atype, aname, *arest in cached_names:
attachment_strings.append(g[atype if atype != "file" else "file"]+" "+aname)
title_string += " | " + " ".join(attachment_strings)
title = urwid.AttrMap(urwid.Text(title_string), header_style)
display_widget = urwid.Pile([
title,
urwid.Text(message.get_content()),
urwid.Text("")
self.progress_widget = urwid.Text("")
self.progress_attr = urwid.AttrMap(self.progress_widget, "progress_full")
content_text = message.get_content()
content_lines = content_text.split("\n")
indented = "\n".join(" "+line for line in content_lines)
pile_widgets = [title]
if is_outbound and msg_state is not None and msg_state < LXMF.LXMessage.SENT and msg_hash is not None:
try:
for pending in app.message_router.pending_outbound:
if pending.hash == msg_hash:
if pending.representation == LXMF.LXMessage.RESOURCE:
self._live_lxm = pending
break
except Exception:
pass
if self._live_lxm is not None:
pct = int(self._live_lxm.progress * 100)
bar_width = 20
filled = int(bar_width * self._live_lxm.progress)
if app.ui.colormode >= 256:
bar = "\u2588" * filled + "\u2591" * (bar_width - filled)
else:
bar = "#" * filled + "-" * (bar_width - filled)
self.progress_widget.set_text(" ["+bar+"] "+str(pct)+"%")
pile_widgets.append(self.progress_attr)
self._start_progress_poll()
pile_widgets.append(urwid.Text(indented))
if has_attachments and cached_names:
att_file_idx = 0
for atype, aname, *arest in cached_names:
glyph = g["file"] if atype == "file" else g[atype]
asize = arest[0] if arest else 0
label = " "+glyph+" "+aname
if asize > 0:
label += " ("+_format_size(asize)+")"
if atype == "file":
pile_widgets.append(ClickableAttachment(label, aname, message, "file", att_file_idx))
att_file_idx += 1
else:
pile_widgets.append(ClickableAttachment(label, aname, message, atype))
pile_widgets.append(urwid.Text(""))
super().__init__(urwid.Pile(pile_widgets))
def _start_progress_poll(self):
try:
loop = nomadnet.NomadNetworkApp.get_shared_instance().ui.loop
if loop:
loop.set_alarm_in(0.3, self._poll_progress)
except Exception:
pass
def _poll_progress(self, loop=None, user_data=None):
if self.transfer_done:
return
if self._live_lxm is None:
self.transfer_done = True
return
app = nomadnet.NomadNetworkApp.get_shared_instance()
g = app.ui.glyphs
progress = self._live_lxm.progress
state = self._live_lxm.state
pct = int(progress * 100)
if state == LXMF.LXMessage.FAILED:
self.progress_widget.set_text(" "+g["cross"]+" Transfer failed")
self.transfer_done = True
self._live_lxm = None
elif state == LXMF.LXMessage.REJECTED:
self.progress_widget.set_text(" "+g["cross"]+" Rejected: too large or not accepted")
self.transfer_done = True
self._live_lxm = None
elif state >= LXMF.LXMessage.SENT:
self.progress_widget.set_text("")
self.transfer_done = True
self._live_lxm = None
else:
bar_width = 20
filled = int(bar_width * progress)
if app.ui.colormode >= 256:
bar = "\u2588" * filled + "\u2591" * (bar_width - filled)
else:
bar = "#" * filled + "-" * (bar_width - filled)
self.progress_widget.set_text(" ["+bar+"] "+str(pct)+"%")
if not self.transfer_done:
try:
ui_loop = app.ui.loop
if ui_loop:
ui_loop.set_alarm_in(0.3, self._poll_progress)
ui_loop.draw_screen()
except Exception:
pass
class ClickableAttachment(urwid.Text):
def __init__(self, label, filename, conv_message, field_type, field_index=0):
self.filename = filename
self.conv_message = conv_message
self.field_type = field_type
self.field_index = field_index
self.saved = False
super().__init__(label)
def mouse_event(self, size, event, button, x, y, focus):
if button == 1 and urwid.util.is_mouse_press(event):
self._save()
return True
return False
def _save(self):
if self.saved:
return
app = nomadnet.NomadNetworkApp.get_shared_instance()
g = app.ui.glyphs
try:
src_path = self.conv_message.get_attachment_file_path(self.field_type, self.field_index)
if src_path and os.path.isfile(src_path):
save_path = _copy_attachment_to_dest(self.filename, src_path)
else:
if self.field_type == "file":
attachments = self.conv_message.get_file_attachments()
if self.field_index < len(attachments):
att = attachments[self.field_index]
if isinstance(att, list) and len(att) >= 2:
data = att[1] if isinstance(att[1], bytes) else b""
else:
data = b""
else:
data = b""
elif self.field_type == "image":
data = self.conv_message.get_image()
data = data if isinstance(data, bytes) else b""
elif self.field_type == "audio":
data = self.conv_message.get_audio()
data = data if isinstance(data, bytes) else b""
else:
data = b""
self.conv_message.unload()
if not data:
return
save_path = _save_attachment_to_disk(self.filename, data)
self.saved = True
self.set_text(" "+g["check"]+" Copied to: "+save_path)
except Exception as e:
RNS.log("Error saving attachment: "+str(e), RNS.LOG_ERROR)
self.set_text(" "+g["cross"]+" Save failed: "+str(e))
def _copy_attachment_to_dest(filename, src_path):
app = nomadnet.NomadNetworkApp.get_shared_instance()
save_dir = app.attachment_save_path if app.attachment_save_path else app.downloads_path
if not os.path.isdir(save_dir):
os.makedirs(save_dir)
save_path = os.path.join(save_dir, filename)
counter = 0
base, ext = os.path.splitext(filename)
while os.path.isfile(save_path):
counter += 1
save_path = os.path.join(save_dir, base+"_"+str(counter)+ext)
shutil.copy2(src_path, save_path)
return save_path
def _save_attachment_to_disk(filename, data):
app = nomadnet.NomadNetworkApp.get_shared_instance()
save_dir = app.attachment_save_path if app.attachment_save_path else app.downloads_path
if not os.path.isdir(save_dir):
os.makedirs(save_dir)
save_path = os.path.join(save_dir, filename)
counter = 0
base, ext = os.path.splitext(filename)
while os.path.isfile(save_path):
counter += 1
save_path = os.path.join(save_dir, base+"_"+str(counter)+ext)
with open(save_path, "wb") as f:
f.write(data)
return save_path
class FileBrowserEntry(urwid.WidgetWrap):
signals = ["click"]
def __init__(self, name, full_path, is_dir=False, is_parent=False, selected=False):
self.full_path = full_path
self.name = name
self.is_dir = is_dir
self.is_parent = is_parent
self.selected = selected
g = nomadnet.NomadNetworkApp.get_shared_instance().ui.glyphs
if is_parent:
display = g["arrow_l"]+" .."
elif is_dir:
display = g["arrow_r"]+" "+name+"/"
elif selected:
display = g["check"]+" "+name
else:
display = " "+name
self.text_widget = urwid.SelectableIcon(display, 0)
if is_dir or is_parent:
style = "list_trusted"
focus_style = "list_focus"
elif selected:
style = "list_trusted"
focus_style = "list_focus_trusted"
else:
style = "list_unknown"
focus_style = "list_focus"
display_widget = urwid.AttrMap(self.text_widget, style, focus_style)
super().__init__(display_widget)
def keypress(self, size, key):
if key == "enter":
self._emit("click")
else:
return key
def mouse_event(self, size, event, button, x, y, focus):
if button == 1 and urwid.util.is_mouse_press(event):
self._emit("click")
return True
return False
class FileBrowserDialog(urwid.WidgetWrap):
def __init__(self, delegate):
self.delegate = delegate
app = nomadnet.NomadNetworkApp.get_shared_instance()
self.g = app.ui.glyphs
self.current_path = os.path.expanduser("~")
self.path_label = urwid.Text("")
self.status_label = urwid.Text("")
self.file_walker = urwid.SimpleFocusListWalker([])
self.file_listbox = urwid.ListBox(self.file_walker)
self.button_columns = urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Done", on_press=self._dismiss)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Cancel", on_press=self._cancel)),
])
super().__init__(display_widget)
header_pile = urwid.Pile([
self.path_label,
self.status_label,
urwid.Divider(self.g["divider1"]),
])
footer_pile = urwid.Pile([
urwid.Divider(self.g["divider1"]),
self.button_columns,
])
self._populate()
self.browser_frame = urwid.Frame(
self.file_listbox,
header=header_pile,
footer=footer_pile,
)
linebox = urwid.LineBox(self.browser_frame, title="Attach File")
super().__init__(linebox)
def _update_status(self):
pending = self.delegate.pending_attachments
if pending:
names = [os.path.basename(p) for p in pending]
self.status_label.set_text(" "+self.g["file"]+" "+str(len(pending))+" selected: "+", ".join(names))
else:
self.status_label.set_text(" No files selected")
def _populate(self):
self.path_label.set_text(" "+self.current_path)
self._update_status()
focus_pos = None
try:
focus_pos = self.file_listbox.focus_position
except Exception:
pass
entries = []
parent = os.path.dirname(self.current_path)
if parent != self.current_path:
entry = FileBrowserEntry("..", parent, is_parent=True)
urwid.connect_signal(entry, "click", self._entry_clicked, entry)
entries.append(entry)
try:
items = sorted(os.listdir(self.current_path))
except PermissionError:
entries.append(urwid.Text(("error_text", " Permission denied")))
self.file_walker[:] = entries
return
dirs = []
files = []
for item in items:
if item.startswith("."):
continue
full = os.path.join(self.current_path, item)
if os.path.isdir(full):
dirs.append((item, full))
elif os.path.isfile(full):
files.append((item, full))
for name, full in dirs:
entry = FileBrowserEntry(name, full, is_dir=True)
urwid.connect_signal(entry, "click", self._entry_clicked, entry)
entries.append(entry)
for name, full in files:
is_selected = full in self.delegate.pending_attachments
entry = FileBrowserEntry(name, full, selected=is_selected)
urwid.connect_signal(entry, "click", self._entry_clicked, entry)
entries.append(entry)
if not dirs and not files:
entries.append(urwid.Text(("inactive_text", " (empty)")))
self.file_walker[:] = entries
if focus_pos is not None and focus_pos < len(entries):
self.file_listbox.set_focus(focus_pos)
elif entries:
self.file_listbox.set_focus(0)
def _entry_clicked(self, entry_widget, user_data=None):
entry = user_data if user_data else entry_widget
if entry.is_dir or entry.is_parent:
self.current_path = entry.full_path
self._populate()
else:
if entry.full_path in self.delegate.pending_attachments:
self.delegate.pending_attachments.remove(entry.full_path)
else:
self.delegate.pending_attachments.append(entry.full_path)
self.delegate.frame.contents["footer"] = (self.delegate._build_footer(), None)
self._populate()
def _dismiss(self, sender):
self.delegate.file_browser_closed()
def _cancel(self, sender):
self.delegate.pending_attachments.clear()
self.delegate.frame.contents["footer"] = (self.delegate._build_footer(), None)
self.delegate.file_browser_closed()
def keypress(self, size, key):
if key == "esc":
self.delegate.file_browser_closed()
return
result = super().keypress(size, key)
if result == "down" and self.browser_frame.focus_position == "body":
self.browser_frame.focus_position = "footer"
return
elif result == "up" and self.browser_frame.focus_position == "footer":
self.browser_frame.focus_position = "body"
return
return result
class SyncProgressBar(urwid.ProgressBar):
def get_text(self):