From 1185b2d6b58c733cb9c0405679048edc91a6b931 Mon Sep 17 00:00:00 2001 From: liamcottle Date: Sun, 29 Sep 2024 19:52:58 +1300 Subject: [PATCH 1/6] start implementing group chat server with membership management --- database.py | 35 ++++ meshchat.py | 197 ++++++++++++++++++++ src/backend/group_chat/group_chat_client.py | 97 ++++++++++ src/backend/group_chat/group_chat_server.py | 119 ++++++++++++ 4 files changed, 448 insertions(+) create mode 100644 src/backend/group_chat/group_chat_client.py create mode 100644 src/backend/group_chat/group_chat_server.py diff --git a/database.py b/database.py index c861556..9307658 100644 --- a/database.py +++ b/database.py @@ -123,3 +123,38 @@ class LxmfConversationReadState(BaseModel): # define table name class Meta: table_name = "lxmf_conversation_read_state" + + +class Group(BaseModel): + + id = BigAutoField() + destination_hash = CharField(unique=True) # unique destination hash + identity_private_key = TextField(null=True) # identity private key if we created and control this group + type = CharField() + public_display_name = CharField() + + created_at = DateTimeField(default=lambda: datetime.now(timezone.utc)) + updated_at = DateTimeField(default=lambda: datetime.now(timezone.utc)) + + # define table name + class Meta: + table_name = "groups" + + +class GroupMember(BaseModel): + + id = BigAutoField() + group_destination_hash = CharField() + member_identity_hash = CharField() + member_display_name = CharField() + + created_at = DateTimeField(default=lambda: datetime.now(timezone.utc)) + updated_at = DateTimeField(default=lambda: datetime.now(timezone.utc)) + + # define table name + class Meta: + table_name = "group_members" + constraints = [ + # only allow a single row per group_destination_hash/member_identity_hash pair + SQL('UNIQUE (group_destination_hash, member_identity_hash)'), + ] diff --git a/meshchat.py b/meshchat.py index 377668d..56ead62 100644 --- a/meshchat.py +++ b/meshchat.py @@ -23,6 +23,7 @@ from serial.tools import list_ports import database from src.backend.announce_handler import AnnounceHandler +from src.backend.group_chat.group_chat_server import GroupChatServer, GroupDataProviderInterface from src.backend.lxmf_message_fields import LxmfImageField, LxmfFileAttachmentsField, LxmfFileAttachment, LxmfAudioField from src.backend.audio_call_manager import AudioCall, AudioCallManager @@ -38,6 +39,79 @@ def get_file_path(filename): return os.path.join(datadir, filename) +class GroupChatDataProvider(GroupDataProviderInterface): + + def __init__(self): + pass + + # finds a group in the database for the provided destination hash + def find_group(self, group_destination_hash: bytes): + return database.Group.get_or_none(database.Group.destination_hash == group_destination_hash.hex()) + + # gets member count of a group + def get_member_count(self, group_destination_hash: bytes) -> int: + + # find group + group = self.find_group(group_destination_hash) + if group is None: + raise Exception("Group not found") + + # get group members count + group_members_count = database.GroupMember.select().where(database.GroupMember.group_destination_hash == group.destination_hash).count() + return group_members_count + + # check if a user is a member of a group + def is_member(self, group_destination_hash: bytes, identity_hash: bytes) -> bool: + + # find group + group = self.find_group(group_destination_hash) + if group is None: + raise Exception("Group not found") + + # find group member + group_member = database.GroupMember.get_or_none( + (database.GroupMember.group_destination_hash == group.destination_hash) + & (database.GroupMember.member_identity_hash == identity_hash.hex())) + + return group_member is not None + + # adds a member to a group + def add_member(self, group_destination_hash: bytes, identity_hash: bytes, display_name: str): + + # find group + group = self.find_group(group_destination_hash) + if group is None: + raise Exception("Group not found") + + # prepare data to insert or update + data = { + "group_destination_hash": group.destination_hash, + "member_identity_hash": identity_hash.hex(), + "member_display_name": display_name, + "updated_at": datetime.now(timezone.utc), + } + + # upsert group member to database + database.GroupMember.insert(data).on_conflict(conflict_target=[ + # one unique row per group_destination_hash/member_identity_hash pair + database.GroupMember.group_destination_hash, + database.GroupMember.member_identity_hash, + ], update=data).execute() + + # removes a member from a group + def remove_member(self, group_destination_hash: bytes, identity_hash: bytes): + + # find group + group = self.find_group(group_destination_hash) + if group is None: + raise Exception("Group not found") + + # remove group member from database + database.GroupMember.delete().where( + (database.GroupMember.group_destination_hash == group.destination_hash) + & (database.GroupMember.member_identity_hash == identity_hash.hex())).execute() + + class ReticulumMeshChat: def __init__(self, identity: RNS.Identity, storage_dir, reticulum_config_dir): @@ -72,6 +146,8 @@ class ReticulumMeshChat: database.Config, database.Announce, database.CustomDestinationDisplayName, + database.Group, + database.GroupMember, database.LxmfMessage, database.LxmfConversationReadState, ]) @@ -142,6 +218,22 @@ class ReticulumMeshChat: self.audio_call_manager = AudioCallManager(identity=self.identity) self.audio_call_manager.register_incoming_call_callback(self.on_incoming_audio_call) + # start group servers we control + self.group_chat_servers = [] + for group in database.Group.get(database.Group.identity_private_key.is_null(False)).select(): + + # load group identity private key from database + group_identity = RNS.Identity(create_keys=False) + group_identity.load_private_key(base64.b64decode(group.identity_private_key)) + + # init group chat server + self.group_chat_servers.append(GroupChatServer( + identity=group_identity, + data_provider=GroupChatDataProvider(), + group_type=group.type, + public_display_name=group.public_display_name, + )) + # start background thread for auto announce loop thread = threading.Thread(target=asyncio.run, args=(self.announce_loop(),)) thread.daemon = True @@ -892,6 +984,95 @@ class ReticulumMeshChat: "announces": announces, }) + # serve groups + @routes.get("/api/v1/groups") + async def index(request): + + # build groups database query + query = database.Group.select() + + # order groups by name asc + query_results = query.order_by(database.Group.public_display_name.asc()) + + # process groups + groups = [] + for group in query_results: + groups.append(self.convert_db_group_to_dict(group)) + + return web.json_response({ + "groups": groups, + }) + + # create group + @routes.post("/api/v1/groups/create") + async def index(request): + + # get request data + data = await request.json() + group_type = data.get('group_type') + public_display_name = data.get('public_display_name') + + # group type is required + # todo check group type is allowed + if group_type is None: + return web.json_response({ + "message": "group type is required", + }, status=422) + + # public display name is required + if public_display_name is None or public_display_name == "": + return web.json_response({ + "message": "public display name is required", + }, status=422) + + # create a new rns identity to manage this group + group_identity = RNS.Identity() + + # determine the destination hash for this new group + destination_hash = RNS.Destination.hash(group_identity, "meshchat", "group").hex() + + # create group in database + database.Group.insert({ + "destination_hash": destination_hash, + "identity_private_key": base64.b64encode(group_identity.get_private_key()).decode("utf-8"), + "type": group_type, + "public_display_name": public_display_name, + "created_at": datetime.now(timezone.utc), + "updated_at": datetime.now(timezone.utc), + }).execute() + + # find new group in database + database_group = database.Group.get_or_none(database.Group.destination_hash == destination_hash) + if database_group is None: + return web.json_response({ + "message": "failed to find group after creating it", + }, status=500) + + return web.json_response({ + "group": self.convert_db_group_to_dict(database_group), + }) + + # delete group + @routes.delete("/api/v1/groups/{destination_hash}") + async def index(request): + + # get path params + destination_hash = request.match_info.get("destination_hash", None) + + # destination hash is required + if destination_hash is None: + return web.json_response({ + "message": "destination hash is required", + }, status=422) + + # delete all database records for the group + database.Group.delete().where((database.Group.destination_hash == destination_hash)).execute() + database.GroupMember.delete().where((database.GroupMember.group_destination_hash == destination_hash)).execute() + + return web.json_response({ + "message": "Group has been deleted", + }) + # propagation node status @routes.get("/api/v1/lxmf/propagation-node/status") async def index(request): @@ -1401,6 +1582,11 @@ class ReticulumMeshChat: # send announce for audio call self.audio_call_manager.announce(app_data=self.config.display_name.get().encode("utf-8")) + # send announce for group chat servers + # todo allow groups to have independent announce schedules? + for group_chat_server in self.group_chat_servers: + group_chat_server.announce() + # tell websocket clients we just announced await self.send_announced_to_websocket_clients() @@ -1879,6 +2065,17 @@ class ReticulumMeshChat: "updated_at": announce.updated_at, } + # convert database group to a dictionary + def convert_db_group_to_dict(self, group: database.Group): + return { + "id": group.id, + "destination_hash": group.destination_hash, + "type": group.type, + "public_display_name": group.public_display_name, + "created_at": group.created_at, + "updated_at": group.updated_at, + } + # convert database lxmf message to a dictionary def convert_db_lxmf_message_to_dict(self, db_lxmf_message: database.LxmfMessage): diff --git a/src/backend/group_chat/group_chat_client.py b/src/backend/group_chat/group_chat_client.py new file mode 100644 index 0000000..acfb5cc --- /dev/null +++ b/src/backend/group_chat/group_chat_client.py @@ -0,0 +1,97 @@ +import asyncio +import json +import time + +import RNS + + +# a group chat client for interacting with group chat servers +class GroupChatClient: + + def __init__(self, group_destination_hash: bytes, user_identity: RNS.Identity): + self.group_destination_hash = group_destination_hash + self.user_identity = user_identity + self.link: RNS.Link | None = None + + # connect to group chat server + async def _connect(self, timeout_after_seconds: int = 15): + + # determine when to timeout connecting + timeout_after_seconds = time.time() + timeout_after_seconds + + # check if we have a path to the destination + if not RNS.Transport.has_path(self.group_destination_hash): + + # we don't have a path, so we need to request it + RNS.Transport.request_path(self.group_destination_hash) + + # wait until we have a path, or give up after the configured timeout + while not RNS.Transport.has_path(self.group_destination_hash) and time.time() < timeout_after_seconds: + await asyncio.sleep(0.1) + + # find destination identity from hash + destination_identity = RNS.Identity.recall(self.group_destination_hash) + if destination_identity is None: + + # we have to bail out since we don't have the identity/path yet + raise Exception("Could not find path to destination. Try again later.") + + # the group destination we will connect to + group_destination = RNS.Destination( + destination_identity, + RNS.Destination.OUT, + RNS.Destination.SINGLE, + "meshchat", + "group", + ) + + # create link to group destination + self.link = RNS.Link(group_destination) + + # wait until we have established a link, or give up after the configured timeout + while self.link.status is not RNS.Link.ACTIVE and time.time() < timeout_after_seconds: + await asyncio.sleep(0.1) + + # if we still haven't established a link, bail out + if self.link.status is not RNS.Link.ACTIVE: + raise Exception("Could not establish link to destination.") + + # send our identity to be able to perform queries + self.link.identify(self.user_identity) + + # makes a request over the link and returns an async response, or throws an exception on error + async def request(self, path: str, data: bytes | None = None): + + # create future + loop = asyncio.get_running_loop() + future = loop.create_future() + + # handle response + def response_callback(request_receipt): + loop.call_soon_threadsafe(future.set_result, request_receipt.response) + + # handle failure + def failed_callback(error): + loop.call_soon_threadsafe(future.set_exception, error) + + # if link is not active, connect now + if self.link is None or self.link.status != RNS.Link.ACTIVE: + await self._connect() + + # send request over link + self.link.request(path, data=data, response_callback=response_callback, failed_callback=failed_callback) + return await future + + # get info about group + async def get_info(self): + return await self.request("/api/v1/info") + + # join group + async def join(self, display_name: str): + return await self.request("/api/v1/join", data=json.dumps({ + "display_name": display_name, + }).encode("utf-8")) + + # leave group + async def leave(self): + return await self.request("/api/v1/leave") diff --git a/src/backend/group_chat/group_chat_server.py b/src/backend/group_chat/group_chat_server.py new file mode 100644 index 0000000..7095377 --- /dev/null +++ b/src/backend/group_chat/group_chat_server.py @@ -0,0 +1,119 @@ +import json +import RNS + + +# the interface a data provider should implement for the group server to function +# for example, you could store and retrieve this information from an sqlite database +# however, an interface is provided to allow you to do this however you want +class GroupDataProviderInterface: + + # gets member count of a group + def get_member_count(self, group_destination_hash: bytes) -> int: + raise Exception("Not Implemented") + + # check if a user is a member of a group + def is_member(self, group_destination_hash: bytes, identity_hash: bytes) -> bool: + raise Exception("Not Implemented") + + # adds a member to a group + def add_member(self, group_destination_hash: bytes, identity_hash: bytes, display_name: str): + raise Exception("Not Implemented") + + # removes a member from a group + def remove_member(self, group_destination_hash: bytes, identity_hash: bytes): + raise Exception("Not Implemented") + + +# a group chat server than can handle membership management +class GroupChatServer: + + GROUP_TYPE_PUBLIC = "public" + + def __init__(self, identity: RNS.Identity, data_provider: GroupDataProviderInterface, public_display_name: str, group_type: str): + + self.identity = identity + self.data_provider = data_provider + self.public_display_name = public_display_name + self.group_type = group_type + + # create group destination + self.group_destination = RNS.Destination( + self.identity, + RNS.Destination.IN, + RNS.Destination.SINGLE, + "meshchat", + "group", + ) + + # register request handlers + self.group_destination.register_request_handler(path="/api/v1/info", response_generator=self.on_received_api_v1_info_request, allow=RNS.Destination.ALLOW_ALL) + self.group_destination.register_request_handler(path="/api/v1/join", response_generator=self.on_received_api_v1_join_request, allow=RNS.Destination.ALLOW_ALL) + self.group_destination.register_request_handler(path="/api/v1/leave", response_generator=self.on_received_api_v1_leave_request, allow=RNS.Destination.ALLOW_ALL) + + # announce group destination + def announce(self): + # todo add app data about group: public_display_name, members_count, group_type + self.group_destination.announce() + print("[GroupChatServer] announced destination: " + RNS.prettyhexrep(self.group_destination.hash)) + + # error response format + def success_response(self, message): + return json.dumps({ + "success": message, + }).encode("utf-8") + + # error response format + def error_response(self, message): + return json.dumps({ + "error": message, + }).encode("utf-8") + + # error response for requests that require an identity + def identity_not_provided_error_response(self): + return self.error_response("You must identity to to access this endpoint.") + + # /api/v1/info + def on_received_api_v1_info_request(self, path, data, request_id, remote_identity, requested_at): + return json.dumps({ + "group_type": self.group_type, + "public_display_name": self.public_display_name, + "members_count": self.data_provider.get_member_count(self.group_destination.hash), + }).encode("utf-8") + + # /api/v1/join + def on_received_api_v1_join_request(self, path, data: bytes | None, request_id, remote_identity: RNS.Identity | None, requested_at): + + # ensure user has identified + if remote_identity is None: + return self.identity_not_provided_error_response() + + # attempt to parse data as json + display_name = None + if data is not None: + try: + json_data = json.loads(data.decode("utf-8")) + display_name = json_data["display_name"] or "Anonymous Peer" + except: + print("failed to parse request data as json") + pass + + # ensure user is not already a member + if self.data_provider.is_member(self.group_destination.hash, remote_identity.hash): + return self.error_response("You are already a member of this group") + + if self.group_type == self.GROUP_TYPE_PUBLIC: + self.data_provider.add_member(self.group_destination.hash, remote_identity.hash, display_name) + return self.success_response("You are now a member of this group") + else: + return self.error_response("Unsupported group type") + + # /api/v1/leave + def on_received_api_v1_leave_request(self, path, data, request_id, remote_identity: RNS.Identity | None, requested_at): + + # ensure user has identified + if remote_identity is None: + return self.identity_not_provided_error_response() + + # remove member from group + self.data_provider.remove_member(self.group_destination.hash, remote_identity.hash) + return self.success_response("You are no longer a member of this group") From f1841ad627ff520ce0baf0fa8fd8b1187274f9c6 Mon Sep 17 00:00:00 2001 From: liamcottle Date: Sun, 29 Sep 2024 20:01:49 +1300 Subject: [PATCH 2/6] add app data for group announce --- src/backend/group_chat/group_chat_server.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/backend/group_chat/group_chat_server.py b/src/backend/group_chat/group_chat_server.py index 7095377..5a8b839 100644 --- a/src/backend/group_chat/group_chat_server.py +++ b/src/backend/group_chat/group_chat_server.py @@ -52,8 +52,11 @@ class GroupChatServer: # announce group destination def announce(self): - # todo add app data about group: public_display_name, members_count, group_type - self.group_destination.announce() + self.group_destination.announce(app_data=json.dumps({ + "group_type": self.group_type, + "public_display_name": self.public_display_name, + "members_count": self.data_provider.get_member_count(self.group_destination.hash), + }).encode("utf-8")) print("[GroupChatServer] announced destination: " + RNS.prettyhexrep(self.group_destination.hash)) # error response format From c3cc3409e9961923fa87e728d8c11578ff2de138 Mon Sep 17 00:00:00 2001 From: liamcottle Date: Sun, 29 Sep 2024 20:26:32 +1300 Subject: [PATCH 3/6] implement api to fetch group members --- meshchat.py | 27 +++++++++++++ src/backend/group_chat/group_chat_client.py | 7 ++++ src/backend/group_chat/group_chat_server.py | 42 +++++++++++++++++++++ 3 files changed, 76 insertions(+) diff --git a/meshchat.py b/meshchat.py index 56ead62..1d7e0d6 100644 --- a/meshchat.py +++ b/meshchat.py @@ -111,6 +111,33 @@ class GroupChatDataProvider(GroupDataProviderInterface): (database.GroupMember.group_destination_hash == group.destination_hash) & (database.GroupMember.member_identity_hash == identity_hash.hex())).execute() + # gets members of a group + def get_members(self, group_destination_hash: bytes, page: int | None, limit: int | None): + + # find group + group = self.find_group(group_destination_hash) + if group is None: + raise Exception("Group not found") + + # build group members database query + query = database.GroupMember.select() + + # paginate results + if page is not None and limit is not None: + query = query.paginate(page, limit) + + # order announces latest to oldest + query_results = query.order_by(database.GroupMember.member_display_name.asc()) + + # process members + members = [] + for member in query_results: + members.append({ + "member_identity_hash": member.member_identity_hash, + "member_display_name": member.member_display_name, + }) + + return members class ReticulumMeshChat: diff --git a/src/backend/group_chat/group_chat_client.py b/src/backend/group_chat/group_chat_client.py index acfb5cc..c9a7d18 100644 --- a/src/backend/group_chat/group_chat_client.py +++ b/src/backend/group_chat/group_chat_client.py @@ -95,3 +95,10 @@ class GroupChatClient: # leave group async def leave(self): return await self.request("/api/v1/leave") + + # get group members + async def get_members(self, page: int, limit: int): + return await self.request("/api/v1/members", data=json.dumps({ + "page": page, + "limit": limit, + }).encode("utf-8")) diff --git a/src/backend/group_chat/group_chat_server.py b/src/backend/group_chat/group_chat_server.py index 5a8b839..87cd45f 100644 --- a/src/backend/group_chat/group_chat_server.py +++ b/src/backend/group_chat/group_chat_server.py @@ -23,6 +23,9 @@ class GroupDataProviderInterface: def remove_member(self, group_destination_hash: bytes, identity_hash: bytes): raise Exception("Not Implemented") + # gets members of a group + def get_members(self, group_destination_hash: bytes, page: int | None, limit: int | None): + raise Exception("Not Implemented") # a group chat server than can handle membership management class GroupChatServer: @@ -49,6 +52,7 @@ class GroupChatServer: self.group_destination.register_request_handler(path="/api/v1/info", response_generator=self.on_received_api_v1_info_request, allow=RNS.Destination.ALLOW_ALL) self.group_destination.register_request_handler(path="/api/v1/join", response_generator=self.on_received_api_v1_join_request, allow=RNS.Destination.ALLOW_ALL) self.group_destination.register_request_handler(path="/api/v1/leave", response_generator=self.on_received_api_v1_leave_request, allow=RNS.Destination.ALLOW_ALL) + self.group_destination.register_request_handler(path="/api/v1/members", response_generator=self.on_received_api_v1_members_request, allow=RNS.Destination.ALLOW_ALL) # announce group destination def announce(self): @@ -120,3 +124,41 @@ class GroupChatServer: # remove member from group self.data_provider.remove_member(self.group_destination.hash, remote_identity.hash) return self.success_response("You are no longer a member of this group") + + # /api/v1/members + def on_received_api_v1_members_request(self, path, data: bytes | None, request_id, remote_identity: RNS.Identity | None, requested_at): + + # ensure user has identified + if remote_identity is None: + return self.identity_not_provided_error_response() + + # attempt to parse data as json + page = None + limit = None + if data is not None: + try: + json_data = json.loads(data.decode("utf-8")) + page = json_data["page"] + limit = json_data["limit"] + except: + print("failed to parse request data as json") + pass + + # ensure user is a member + if not self.data_provider.is_member(self.group_destination.hash, remote_identity.hash): + return self.error_response("You are not a member of this group") + + # get group members + database_group_members = self.data_provider.get_members(self.group_destination.hash, page, limit) + + # process group members + group_members = [] + for database_group_member in database_group_members: + group_members.append({ + "identity_hash": database_group_member["member_identity_hash"], + "display_name": database_group_member["member_display_name"], + }) + + return json.dumps({ + "members": group_members, + }).encode("utf-8") From 2a5152ea67c641c5909f0c131f80e708f8146eb8 Mon Sep 17 00:00:00 2001 From: liamcottle Date: Sun, 29 Sep 2024 20:34:04 +1300 Subject: [PATCH 4/6] test group chat client --- src/backend/group_chat/group_chat_client.py | 23 +++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/backend/group_chat/group_chat_client.py b/src/backend/group_chat/group_chat_client.py index c9a7d18..571feeb 100644 --- a/src/backend/group_chat/group_chat_client.py +++ b/src/backend/group_chat/group_chat_client.py @@ -102,3 +102,26 @@ class GroupChatClient: "page": page, "limit": limit, }).encode("utf-8")) + + +# python3 group_chat_client.py +# used for testing group chat client +async def main(): + + # init rns and create random identity + RNS.Reticulum() + identity = RNS.Identity() + + # create group chat client + group_chat_client = GroupChatClient(bytes.fromhex("9862f823bf77a450c44e3a2edccf1dc0"), identity) + + # test requests + print(await group_chat_client.get_info()) + print(await group_chat_client.join("Test Display Name")) + print(await group_chat_client.get_info()) + print(await group_chat_client.get_members(page=1, limit=10)) + print(await group_chat_client.leave()) + print(await group_chat_client.get_info()) + +if __name__ == "__main__": + asyncio.run(main()) From d084bbc73d7ff7071e83bec2dc98e28322ca5bb3 Mon Sep 17 00:00:00 2001 From: liamcottle Date: Sun, 29 Sep 2024 22:59:37 +1300 Subject: [PATCH 5/6] implement sending messages to group --- database.py | 16 ++++++++ meshchat.py | 34 ++++++++++++++++ src/backend/group_chat/group_chat_client.py | 8 ++++ src/backend/group_chat/group_chat_server.py | 45 +++++++++++++++++++-- 4 files changed, 99 insertions(+), 4 deletions(-) diff --git a/database.py b/database.py index 9307658..3e8f7e0 100644 --- a/database.py +++ b/database.py @@ -158,3 +158,19 @@ class GroupMember(BaseModel): # only allow a single row per group_destination_hash/member_identity_hash pair SQL('UNIQUE (group_destination_hash, member_identity_hash)'), ] + + +class GroupMessage(BaseModel): + + id = BigAutoField() + hash = CharField(unique=True) + group_destination_hash = CharField(index=True) + member_identity_hash = CharField() + content = TextField() + + created_at = DateTimeField(default=lambda: datetime.now(timezone.utc)) + updated_at = DateTimeField(default=lambda: datetime.now(timezone.utc)) + + # define table name + class Meta: + table_name = "group_messages" diff --git a/meshchat.py b/meshchat.py index 1d7e0d6..580c8b1 100644 --- a/meshchat.py +++ b/meshchat.py @@ -139,6 +139,39 @@ class GroupChatDataProvider(GroupDataProviderInterface): return members + # save a message sent to the group by the provided identity + def on_message_received(self, group_destination_hash: bytes, identity_hash: bytes, data: dict): + + # find group + group = self.find_group(group_destination_hash) + if group is None: + raise Exception("Group not found") + + # generate a message hash similar to lxmf + message_data_to_hash = b"" + message_data_to_hash += group_destination_hash + message_data_to_hash += identity_hash + message_data_to_hash += msgpack.packb(data) + message_hash = RNS.Identity.full_hash(message_data_to_hash).hex() + + # get content as string + content = None + if "content" in data: + content = str(data["content"]) + + # prepare data to insert or update + data = { + "hash": message_hash, + "group_destination_hash": group.destination_hash, + "member_identity_hash": identity_hash.hex(), + "content": content, + "updated_at": datetime.now(timezone.utc), + } + + # upsert message to database + database.GroupMessage.insert(data).on_conflict(conflict_target=[database.GroupMessage.hash], update=data).execute() + + class ReticulumMeshChat: def __init__(self, identity: RNS.Identity, storage_dir, reticulum_config_dir): @@ -175,6 +208,7 @@ class ReticulumMeshChat: database.CustomDestinationDisplayName, database.Group, database.GroupMember, + database.GroupMessage, database.LxmfMessage, database.LxmfConversationReadState, ]) diff --git a/src/backend/group_chat/group_chat_client.py b/src/backend/group_chat/group_chat_client.py index 571feeb..60b9581 100644 --- a/src/backend/group_chat/group_chat_client.py +++ b/src/backend/group_chat/group_chat_client.py @@ -103,6 +103,13 @@ class GroupChatClient: "limit": limit, }).encode("utf-8")) + # send message + async def send_message(self, content: str): + return await self.request("/api/v1/messages/send", data=json.dumps({ + "timestamp": time.time(), + "content": content, + }).encode("utf-8")) + # python3 group_chat_client.py # used for testing group chat client @@ -120,6 +127,7 @@ async def main(): print(await group_chat_client.join("Test Display Name")) print(await group_chat_client.get_info()) print(await group_chat_client.get_members(page=1, limit=10)) + print(await group_chat_client.send_message("hello world!")) print(await group_chat_client.leave()) print(await group_chat_client.get_info()) diff --git a/src/backend/group_chat/group_chat_server.py b/src/backend/group_chat/group_chat_server.py index 87cd45f..02b53e1 100644 --- a/src/backend/group_chat/group_chat_server.py +++ b/src/backend/group_chat/group_chat_server.py @@ -27,6 +27,11 @@ class GroupDataProviderInterface: def get_members(self, group_destination_hash: bytes, page: int | None, limit: int | None): raise Exception("Not Implemented") + # save a message sent to the group by the provided identity + def on_message_received(self, group_destination_hash: bytes, identity_hash: bytes, data: dict): + raise Exception("Not Implemented") + + # a group chat server than can handle membership management class GroupChatServer: @@ -53,6 +58,7 @@ class GroupChatServer: self.group_destination.register_request_handler(path="/api/v1/join", response_generator=self.on_received_api_v1_join_request, allow=RNS.Destination.ALLOW_ALL) self.group_destination.register_request_handler(path="/api/v1/leave", response_generator=self.on_received_api_v1_leave_request, allow=RNS.Destination.ALLOW_ALL) self.group_destination.register_request_handler(path="/api/v1/members", response_generator=self.on_received_api_v1_members_request, allow=RNS.Destination.ALLOW_ALL) + self.group_destination.register_request_handler(path="/api/v1/messages/send", response_generator=self.on_received_api_v1_messages_send_request, allow=RNS.Destination.ALLOW_ALL) # announce group destination def announce(self): @@ -79,6 +85,10 @@ class GroupChatServer: def identity_not_provided_error_response(self): return self.error_response("You must identity to to access this endpoint.") + # error response for failing to parse request data as json + def request_json_parsing_error_response(self): + return self.error_response("Failed to parse request data as JSON.") + # /api/v1/info def on_received_api_v1_info_request(self, path, data, request_id, remote_identity, requested_at): return json.dumps({ @@ -101,8 +111,7 @@ class GroupChatServer: json_data = json.loads(data.decode("utf-8")) display_name = json_data["display_name"] or "Anonymous Peer" except: - print("failed to parse request data as json") - pass + return self.request_json_parsing_error_response() # ensure user is not already a member if self.data_provider.is_member(self.group_destination.hash, remote_identity.hash): @@ -141,8 +150,7 @@ class GroupChatServer: page = json_data["page"] limit = json_data["limit"] except: - print("failed to parse request data as json") - pass + return self.request_json_parsing_error_response() # ensure user is a member if not self.data_provider.is_member(self.group_destination.hash, remote_identity.hash): @@ -162,3 +170,32 @@ class GroupChatServer: return json.dumps({ "members": group_members, }).encode("utf-8") + + # /api/v1/messages/send + def on_received_api_v1_messages_send_request(self, path, data: bytes | None, request_id, remote_identity: RNS.Identity | None, requested_at): + + # ensure user has identified + if remote_identity is None: + return self.identity_not_provided_error_response() + + # ensure user is a member + if not self.data_provider.is_member(self.group_destination.hash, remote_identity.hash): + return self.error_response("You are not a member of this group") + + # attempt to parse data as json + json_data = None + if data is not None: + try: + json_data = json.loads(data.decode("utf-8")) + except: + return self.request_json_parsing_error_response() + + # todo ensure only expected content is received + # todo ensure timestamp + + # handle received message + self.data_provider.on_message_received(self.group_destination.hash, remote_identity.hash, json_data) + + return json.dumps({ + "success": "Message received", + }).encode("utf-8") From d305daebca7f80f6a2f2bf41687ab32f1679c49e Mon Sep 17 00:00:00 2001 From: liamcottle Date: Sun, 29 Sep 2024 23:20:18 +1300 Subject: [PATCH 6/6] implement fetching messages from group --- meshchat.py | 46 +++++++++++++++++++++ src/backend/group_chat/group_chat_client.py | 9 ++++ src/backend/group_chat/group_chat_server.py | 46 +++++++++++++++++++++ 3 files changed, 101 insertions(+) diff --git a/meshchat.py b/meshchat.py index 580c8b1..fdee616 100644 --- a/meshchat.py +++ b/meshchat.py @@ -139,6 +139,52 @@ class GroupChatDataProvider(GroupDataProviderInterface): return members + # gets messages of a group + def get_messages(self, group_destination_hash: bytes, order: str | None, limit: int | None, after_id: int | None): + + # find group + group = self.find_group(group_destination_hash) + if group is None: + raise Exception("Group not found") + + # build group messages database query + query = database.GroupMessage.select().where(database.GroupMessage.group_destination_hash == group.destination_hash) + + # limit results + if limit is not None: + query = query.limit(limit) + + # order results + if order == "asc": + + # order asc + query = query.order_by(database.GroupMessage.id.asc()) + + # only results after provided id + if after_id is not None: + query = query.where(database.GroupMessage.id > int(after_id)) + + elif order == "desc": + + # order desc + query = query.order_by(database.GroupMessage.id.desc()) + + # only results before provided id + if after_id is not None: + query = query.where(database.GroupMessage.id < int(after_id)) + + # process messages + messages = [] + for message in query: + messages.append({ + "id": message.id, + "member_identity_hash": message.member_identity_hash, + "content": message.content, + "created_at": message.created_at, + }) + + return messages + # save a message sent to the group by the provided identity def on_message_received(self, group_destination_hash: bytes, identity_hash: bytes, data: dict): diff --git a/src/backend/group_chat/group_chat_client.py b/src/backend/group_chat/group_chat_client.py index 60b9581..cfb11fa 100644 --- a/src/backend/group_chat/group_chat_client.py +++ b/src/backend/group_chat/group_chat_client.py @@ -103,6 +103,14 @@ class GroupChatClient: "limit": limit, }).encode("utf-8")) + # get group messages + async def get_messages(self, order: str, limit: int, after_id: int | None): + return await self.request("/api/v1/messages", data=json.dumps({ + "order": order, + "limit": limit, + "after_id": after_id, + }).encode("utf-8")) + # send message async def send_message(self, content: str): return await self.request("/api/v1/messages/send", data=json.dumps({ @@ -128,6 +136,7 @@ async def main(): print(await group_chat_client.get_info()) print(await group_chat_client.get_members(page=1, limit=10)) print(await group_chat_client.send_message("hello world!")) + print(await group_chat_client.get_messages(order="desc", limit=1, after_id=None)) print(await group_chat_client.leave()) print(await group_chat_client.get_info()) diff --git a/src/backend/group_chat/group_chat_server.py b/src/backend/group_chat/group_chat_server.py index 02b53e1..be451d2 100644 --- a/src/backend/group_chat/group_chat_server.py +++ b/src/backend/group_chat/group_chat_server.py @@ -27,6 +27,10 @@ class GroupDataProviderInterface: def get_members(self, group_destination_hash: bytes, page: int | None, limit: int | None): raise Exception("Not Implemented") + # gets messages of a group + def get_messages(self, group_destination_hash: bytes, order: str | None, limit: int | None, after_id: int | None): + raise Exception("Not Implemented") + # save a message sent to the group by the provided identity def on_message_received(self, group_destination_hash: bytes, identity_hash: bytes, data: dict): raise Exception("Not Implemented") @@ -58,6 +62,7 @@ class GroupChatServer: self.group_destination.register_request_handler(path="/api/v1/join", response_generator=self.on_received_api_v1_join_request, allow=RNS.Destination.ALLOW_ALL) self.group_destination.register_request_handler(path="/api/v1/leave", response_generator=self.on_received_api_v1_leave_request, allow=RNS.Destination.ALLOW_ALL) self.group_destination.register_request_handler(path="/api/v1/members", response_generator=self.on_received_api_v1_members_request, allow=RNS.Destination.ALLOW_ALL) + self.group_destination.register_request_handler(path="/api/v1/messages", response_generator=self.on_received_api_v1_messages_request, allow=RNS.Destination.ALLOW_ALL) self.group_destination.register_request_handler(path="/api/v1/messages/send", response_generator=self.on_received_api_v1_messages_send_request, allow=RNS.Destination.ALLOW_ALL) # announce group destination @@ -171,6 +176,47 @@ class GroupChatServer: "members": group_members, }).encode("utf-8") + # /api/v1/messages + def on_received_api_v1_messages_request(self, path, data: bytes | None, request_id, remote_identity: RNS.Identity | None, requested_at): + + # ensure user has identified + if remote_identity is None: + return self.identity_not_provided_error_response() + + # ensure user is a member + if not self.data_provider.is_member(self.group_destination.hash, remote_identity.hash): + return self.error_response("You are not a member of this group") + + # attempt to parse data as json + order = None + limit = None + after_id = None + if data is not None: + try: + json_data = json.loads(data.decode("utf-8")) + order = json_data["order"] + limit = json_data["limit"] + after_id = json_data["after_id"] + except: + return self.request_json_parsing_error_response() + + # get group messages + database_group_messages = self.data_provider.get_messages(self.group_destination.hash, order, limit, after_id) + + # process group messages + group_messages = [] + for database_group_message in database_group_messages: + group_messages.append({ + "id": database_group_message["id"], + "member_identity_hash": database_group_message["member_identity_hash"], + "content": database_group_message["content"], + "created_at": database_group_message["created_at"], + }) + + return json.dumps({ + "messages": group_messages, + }).encode("utf-8") + # /api/v1/messages/send def on_received_api_v1_messages_send_request(self, path, data: bytes | None, request_id, remote_identity: RNS.Identity | None, requested_at):