From be0fddf2fc7a510c90da8c7460fcc4e68bec72de Mon Sep 17 00:00:00 2001 From: kc1awv Date: Sat, 29 Mar 2025 19:31:10 -0400 Subject: [PATCH] Add GPIO keypad driver for 5x5 matrix input --- LXST/Primitives/hardware/keypad_gpio_5x5.py | 131 ++++++++++++++++++++ LXST/Utilities/rnphone.py | 125 +++++++++++++------ 2 files changed, 221 insertions(+), 35 deletions(-) create mode 100644 LXST/Primitives/hardware/keypad_gpio_5x5.py diff --git a/LXST/Primitives/hardware/keypad_gpio_5x5.py b/LXST/Primitives/hardware/keypad_gpio_5x5.py new file mode 100644 index 0000000..5840d25 --- /dev/null +++ b/LXST/Primitives/hardware/keypad_gpio_5x5.py @@ -0,0 +1,131 @@ +import os +import time +import threading +from importlib.util import find_spec +if find_spec("RPi"): import RPi.GPIO as GPIO +else: raise OSError(f"No GPIO module available, cannot use {os.path.basename(__file__)} driver") + +class Event: + UP = 0x00 + DOWN = 0x01 + +class Keypad(): + ROWS = 5 + COLS = 5 + SCAN_INTERVAL_MS = 20 + + LOW = 0x00 + HIGH = 0x01 + + DEFAULT_MAP = [["P", "R", "M", "-", "+"], + ["1", "2", "3", "A", "B"], + ["4", "5", "6", "C", "D"], + ["7", "8", "9", "E", "F"], + ["*", "0", "#", "N", "K"]] + + DEFAULT_ROWPINS = [21, 20, 16, 12, 7] + DEFAULT_COLPINS = [26, 19, 13, 6, 5] + DEFAULT_HOOKPIN = 11 + HOOK_DEBOUNCE_MS = 150 + + def __init__(self, row_pins=None, col_pins=None, key_map=None, callback=None): + if not row_pins == None and (not type(row_pins) == list or len(row_pins) != 5): + raise ValueError("Invalid row pins specification") + if not col_pins == None and (not type(col_pins) == list or len(col_pins) != 5): + raise ValueError("Invalid row pins specification") + + self.row_pins = row_pins or self.DEFAULT_ROWPINS + self.col_pins = col_pins or self.DEFAULT_COLPINS + self.scan_lock = threading.Lock() + self.callback = callback + self.hook_time = 0 + self.hook_pin = None + self.on_hook = True + self.check_hook = False + self.should_run = False + self.ec = Event + self.set_key_map(key_map) + + def enable_hook(self, pin=None): + if pin == None: pin = self.DEFAULT_HOOKPIN + self.hook_pin = pin + GPIO.setup(self.hook_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) + self.key_states["hook"] = False + self.check_hook = True + + def set_key_map(self, key_map): + self.key_map = key_map or self.DEFAULT_MAP + self.key_states = {} + for row in self.key_map: + for key in row: self.key_states[key] = False + + def is_down(self, key): + if not key in self.key_states: return False + else: + return self.key_states[key] + + def is_up(self, key): + if not key in self.key_states: return False + else: + return not self.key_states[key] + + def __job(self): + while self.should_run: + self.__scan() + time.sleep(self.SCAN_INTERVAL_MS/1000) + + def __handle(self, active_keys): + events = [] + for key in self.key_states: + if self.key_states[key] == False: + if key in active_keys: + self.key_states[key] = True + events.append((key, Event.DOWN)) + + elif self.key_states[key] == True: + if not key in active_keys: + self.key_states[key] = False + events.append((key, Event.UP)) + + if callable(self.callback): + for event in events: + self.callback(self, event) + + def __scan(self): + active_keys = [] + for row in range(0, self.ROWS): + GPIO.setup(self.row_pins[row], GPIO.OUT) + GPIO.output(self.row_pins[row], GPIO.HIGH) + for col in range(0, self.COLS): + if GPIO.input(self.col_pins[col]): + active_keys.append(self.key_map[row][col]) + + GPIO.output(self.row_pins[row], GPIO.LOW) + GPIO.setup(self.row_pins[row], GPIO.IN, pull_up_down=GPIO.PUD_OFF) + + if self.check_hook: + on_hook = GPIO.input(self.hook_pin) == GPIO.LOW + + if on_hook: + active_keys.append("hook") + self.hook_time = time.time() + + if self.key_states["hook"] == True and not on_hook: + if time.time()-self.hook_time < self.HOOK_DEBOUNCE_MS/1000: + active_keys.append("hook") + else: + self.hook_time = time.time() + + if len(active_keys) >= 0 and len(active_keys) <= 4: self.__handle(active_keys) + + def start(self): + GPIO.setwarnings(False) + GPIO.setmode(GPIO.BCM) + for row_pin in self.row_pins: GPIO.setup(row_pin, GPIO.OUT) + for col_pin in self.col_pins: GPIO.setup(col_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) + + self.should_run = True + threading.Thread(target=self.__job, daemon=True).start() + + def stop(self): + self.should_run = False diff --git a/LXST/Utilities/rnphone.py b/LXST/Utilities/rnphone.py index 526f57f..67940e9 100644 --- a/LXST/Utilities/rnphone.py +++ b/LXST/Utilities/rnphone.py @@ -25,6 +25,7 @@ class ReticulumTelephone(): KPD_NUMBERS = ["0","1","2","3","4","5","6","7","8","9"] KPD_HEX_ALPHA = ["A","B","C","D","E","F"] KPD_SYMBOLS = ["*","#"] + KPD_COMMANDS = ["P","R","M","N","K","-","+"] RING_TIME = 30 WAIT_TIME = 60 @@ -225,10 +226,15 @@ class ReticulumTelephone(): def enable_keypad(self, driver): if self.service: RNS.log(f"Starting keypad: {driver}", RNS.LOG_DEBUG) + self.keypad_driver = driver if driver == "gpio_4x4": from LXST.Primitives.hardware.keypad_gpio_4x4 import Keypad self.keypad = Keypad(callback=self._keypad_event) self.keypad.start() + elif driver == "gpio_5x5": + from LXST.Primitives.hardware.keypad_gpio_5x5 import Keypad + self.keypad = Keypad(callback=self._keypad_event) + self.keypad.start() else: raise OSError("Unknown keypad driver specified") def enable_hook(self, pin=None): @@ -594,52 +600,101 @@ class ReticulumTelephone(): self.became_available() if self.is_ringing: - answer_events = event[0] == "D" and event[1] == self.keypad.ec.DOWN - answer_events |= event[0] == "hook" and event[1] == self.keypad.ec.UP - if answer_events: - print(f"Answering call from {RNS.prettyhexrep(self.caller.hash)}") - if not self.telephone.answer(self.caller): - print(f"Could not answer call from {RNS.prettyhexrep(self.caller.hash)}") - elif event[0] == "C" and event[1] == self.keypad.ec.DOWN: - print(f"Rejecting call from {RNS.prettyhexrep(self.caller.hash)}") - self.telephone.hangup() + if self.keypad_driver == "gpio_4x4": + answer_events = event[0] == "D" and event[1] == self.keypad.ec.DOWN + answer_events |= event[0] == "hook" and event[1] == self.keypad.ec.UP + if answer_events: + print(f"Answering call from {RNS.prettyhexrep(self.caller.hash)}") + if not self.telephone.answer(self.caller): + print(f"Could not answer call from {RNS.prettyhexrep(self.caller.hash)}") + elif event[0] == "C" and event[1] == self.keypad.ec.DOWN: + print(f"Rejecting call from {RNS.prettyhexrep(self.caller.hash)}") + self.telephone.hangup() + + elif self.keypad_driver == "gpio_5x5": + answer_events = event[0] == "N" and event[1] == self.keypad.ec.DOWN + answer_events |= event[0] == "hook" and event[1] == self.keypad.ec.UP + if answer_events: + print(f"Answering call from {RNS.prettyhexrep(self.caller.hash)}") + if not self.telephone.answer(self.caller): + print(f"Could not answer call from {RNS.prettyhexrep(self.caller.hash)}") + elif event[0] == "K" and event[1] == self.keypad.ec.DOWN: + print(f"Rejecting call from {RNS.prettyhexrep(self.caller.hash)}") + self.telephone.hangup() elif self.is_in_call or self.call_is_connecting: - hangup_events = event[0] == "D" and event[1] == self.keypad.ec.DOWN - hangup_events |= event[0] == "hook" and event[1] == self.keypad.ec.DOWN - if hangup_events: - print(f"Hanging up call with {RNS.prettyhexrep(self.caller.hash)}") - self.telephone.hangup() + if self.keypad_driver == "gpio_4x4": + hangup_events = event[0] == "D" and event[1] == self.keypad.ec.DOWN + hangup_events |= event[0] == "hook" and event[1] == self.keypad.ec.DOWN + if hangup_events: + print(f"Hanging up call with {RNS.prettyhexrep(self.caller.hash)}") + self.telephone.hangup() + + elif self.keypad_driver == "gpio_5x5": + hangup_events = event[0] == "N" and event[1] == self.keypad.ec.DOWN + hangup_events |= event[0] == "hook" and event[1] == self.keypad.ec.DOWN + if hangup_events: + print(f"Hanging up call with {RNS.prettyhexrep(self.caller.hash)}") + self.telephone.hangup() elif self.is_available and self.hw_is_idle: - if event[0] == "A" and event[1] == self.keypad.ec.DOWN: - self.hw_input = ""; self.hw_state = self.HW_STATE_DIAL - self._update_display() + if self.keypad_driver == "gpio_4x4": + if event[0] == "A" and event[1] == self.keypad.ec.DOWN: + self.hw_input = ""; self.hw_state = self.HW_STATE_DIAL + self._update_display() + if event[0] in self.KPD_NUMBERS and event[1] == self.keypad.ec.DOWN: + self.hw_input += event[0]; self.hw_state = self.HW_STATE_DIAL + self._update_display() - if event[0] in self.KPD_NUMBERS and event[1] == self.keypad.ec.DOWN: - self.hw_input += event[0]; self.hw_state = self.HW_STATE_DIAL - self._update_display() + if self.keypad_driver == "gpio_5x5": + if event[0] == "N" and event[1] == self.keypad.ec.DOWN: + self.hw_input = ""; self.hw_state = self.HW_STATE_DIAL + self._update_display() + if event[0] in self.KPD_NUMBERS and event[1] == self.keypad.ec.DOWN: + self.hw_input += event[0]; self.hw_state = self.HW_STATE_DIAL + self._update_display() elif self.is_available and self.hw_is_dialing: dial_event = False - if event[1] == self.keypad.ec.DOWN: - if event[0] in self.KPD_NUMBERS: self.hw_input += event[0] - if event[0] == "A": self.became_available() - if event[0] == "B": self.hw_input = self.hw_input[:-1] - if event[0] == "C": self.hw_input = "" - if event[0] == "D": dial_event = True + if self.keypad_driver == "gpio_4x4": + if event[1] == self.keypad.ec.DOWN: + if event[0] in self.KPD_NUMBERS: self.hw_input += event[0] + if event[0] == "A": self.became_available() + if event[0] == "B": self.hw_input = self.hw_input[:-1] + if event[0] == "C": self.hw_input = "" + if event[0] == "D": dial_event = True - if event[0] == "hook" and event[1] == self.keypad.ec.UP: dial_event = True + if event[0] == "hook" and event[1] == self.keypad.ec.UP: dial_event = True - if dial_event: - for identity_hash in self.aliases: - alias = self.aliases[identity_hash] - if self.hw_input == alias: - self.hw_input = "" - self.hw_state = self.HW_STATE_IDLE - self.dial(identity_hash) + if dial_event: + for identity_hash in self.aliases: + alias = self.aliases[identity_hash] + if self.hw_input == alias: + self.hw_input = "" + self.hw_state = self.HW_STATE_IDLE + self.dial(identity_hash) - self._update_display() + self._update_display() + + if self.keypad_driver == "gpio_5x5": + if event[1] == self.keypad.ec.DOWN: + if event[0] in self.KPD_NUMBERS: self.hw_input += event[0] + if event[0] == "A": self.became_available() + if event[0] == "C": self.hw_input = "" + if event[0] == "K": self.hw_input = self.hw_input[:-1] + if event[0] == "N": dial_event = True + + if event[0] == "hook" and event[1] == self.keypad.ec.UP: dial_event = True + + if dial_event: + for identity_hash in self.aliases: + alias = self.aliases[identity_hash] + if self.hw_input == alias: + self.hw_input = "" + self.hw_state = self.HW_STATE_IDLE + self.dial(identity_hash) + + self._update_display() def sigint_handler(self, signal, frame): self.cleanup()