From 30246b9355b04b2dafc3f6f69b4d57bb91c5ade2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Odd=20Str=C3=A5b=C3=B8?= Date: Fri, 5 Nov 2021 06:21:01 +0100 Subject: [PATCH] Split out BTLEUart to separate module --- feasycom_ble.py | 187 ++++++++++++++++++++++++++++++++++++++++++++++ solar_ble.py | 194 +----------------------------------------------- test_bleuart.py | 3 +- 3 files changed, 191 insertions(+), 193 deletions(-) create mode 100644 feasycom_ble.py diff --git a/feasycom_ble.py b/feasycom_ble.py new file mode 100644 index 0000000..2f7f262 --- /dev/null +++ b/feasycom_ble.py @@ -0,0 +1,187 @@ +# -*- coding: utf-8 -*- +import io +import queue +import time +from typing import TYPE_CHECKING, Optional, cast + +from bluepy import btle + +if TYPE_CHECKING: + from _typeshed import ReadableBuffer, WriteableBuffer + + +WRITE_DEVICE = "0000ffd1-0000-1000-8000-00805f9b34fb" +READ_DEVICE = "0000fff1-0000-1000-8000-00805f9b34fb" + + +class BTLEUart(io.RawIOBase): + mac: str + write_endpoint: str + read_endpoint: str + timeout: float + + device: Optional[btle.Peripheral] = None + _write_handle: Optional[btle.Characteristic] = None + _read_handle: Optional[btle.Characteristic] = None + delegate: "_QueueDelegate" + _read_buffer: bytearray + + class _QueueDelegate(btle.DefaultDelegate): + queue: queue.Queue + handle: Optional[int] + + def __init__(self, queue, handle=None): + self.queue = queue + self.handle = handle + + def handleNotification(self, cHandle: int, data: bytes): + # print("Notification:", cHandle, "sent data", binascii.b2a_hex(data)) + + if self.handle is not None and cHandle != self.handle: + return + + self.queue.put(data) + + def __init__( + self, + mac: str, + write_endpoint: str = WRITE_DEVICE, + read_endpoint: str = READ_DEVICE, + timeout: float = 30, + ): + self.mac = mac + self.write_endpoint = write_endpoint + self.read_endpoint = read_endpoint + self.timeout = timeout + + self.delegate = self._QueueDelegate(queue.Queue()) + self._read_buffer = bytearray() + + self._connect() + + def _ensure_connected(self): + if self.device is None: + self._connect() + + def _poll(self, timeout: float = 0.0001): + self._ensure_connected() + if TYPE_CHECKING: + self.device = cast(btle.Peripheral, self.device) + + start = time.time() + left = timeout - (time.time() - start) + while self.device.waitForNotifications(max(left, 0) or 0.0001): + left = timeout - (time.time() - start) + if left < 0: + break + + def _connect(self): + try: + del self.device + except Exception: + pass + self.device = btle.Peripheral(self.mac).withDelegate(self.delegate) + self._read_handle = self.device.getCharacteristics(uuid=self.read_endpoint)[0] + # self.delegate.handle = self._read_handle.handle + self._write_handle = self.device.getCharacteristics(uuid=self.write_endpoint)[0] + # print("Handles:", self._read_handle.handle, self._write_handle.handle) + + def _read(self, num: Optional[int] = None, timeout: Optional[float] = None): + self._ensure_connected() + if TYPE_CHECKING: + self.device = cast(btle.Peripheral, self.device) + + if timeout is None: + timeout = self.timeout + + if num is None: + start = time.time() + while not len(self._read_buffer): + left = timeout - (time.time() - start) + if left < 0: + break + self._poll() + try: + self._read_buffer.extend(self.delegate.queue.get_nowait()) + except queue.Empty: + pass + try: + while True: + self._poll() + self._read_buffer.extend(self.delegate.queue.get_nowait()) + + except queue.Empty: + pass + else: + start = time.time() + while len(self._read_buffer) < num: + left = timeout - (time.time() - start) + if left < 0: + break + self._poll() + try: + self._read_buffer.extend(self.delegate.queue.get_nowait()) + except queue.Empty: + pass + + if num is None: + data = bytes(self._read_buffer.copy()) + self._read_buffer.clear() + else: + data = bytes(self._read_buffer[:num]) + del self._read_buffer[:num] + return data or None + + def readinto(self, buffer: "WriteableBuffer") -> Optional[int]: + data = self._read(len(buffer)) + + if data is None: + return None + + buffer[: len(data)] = data + return len(data) + + def readall(self) -> bytes: + return self._read() + + def read( + self, size: Optional[int] = None, timeout: Optional[float] = None + ) -> Optional[bytes]: + if timeout: + _timeout = self.timeout + self.timeout = timeout + + if size is None: + res = super().read() + else: + res = super().read(size) + + if timeout: + self.timeout = _timeout + return res + + def write(self, b: "ReadableBuffer") -> Optional[int]: + self._ensure_connected() + if TYPE_CHECKING: + self.device = cast(btle.Peripheral, self.device) + + if self._write_handle is None: + raise IOError("write_handle not open") + self._write_handle.write(b, withResponse=True) + return len(b) + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.device.disconnect() + del self.device + + def seekable(self) -> bool: + return False + + def readable(self) -> bool: + return True + + def writable(self) -> bool: + return True diff --git a/solar_ble.py b/solar_ble.py index 7979f8b..8f5e03c 100755 --- a/solar_ble.py +++ b/solar_ble.py @@ -2,16 +2,9 @@ # -*- coding: utf-8 -*- import datetime -import io -import queue import struct import sys import time -from queue import Queue -from typing import TYPE_CHECKING, Optional, cast - -if TYPE_CHECKING: - from _typeshed import ReadableBuffer, WriteableBuffer from bluepy import btle from libscrc import modbus @@ -21,8 +14,6 @@ INTERVAL = 15 # write_service = "0000ffd0-0000-1000-8000-00805f9b34fb" # read_service = "0000fff0-0000-1000-8000-00805f9b34fb" -WRITE_DEVICE = "0000ffd1-0000-1000-8000-00805f9b34fb" -READ_DEVICE = "0000fff1-0000-1000-8000-00805f9b34fb" ACTION_READ = 0x03 ACTION_WRITE = 0x03 @@ -235,189 +226,6 @@ class Delegate(btle.DefaultDelegate): self.data.clear() -class BTLEUart(io.RawIOBase): - mac: str - write_endpoint: str - read_endpoint: str - timeout: float - - device: Optional[btle.Peripheral] = None - _write_handle: Optional[btle.Characteristic] = None - _read_handle: Optional[btle.Characteristic] = None - delegate: "_QueueDelegate" - _read_buffer: bytearray - - class _QueueDelegate(btle.DefaultDelegate): - queue: Queue - handle: Optional[int] - - def __init__(self, queue, handle=None): - self.queue = queue - self.handle = handle - - def handleNotification(self, cHandle: int, data: bytes): - # print("Notification:", cHandle, "sent data", binascii.b2a_hex(data)) - - if self.handle is not None and cHandle != self.handle: - return - - self.queue.put(data) - - def __init__( - self, - mac: str, - write_endpoint: str = WRITE_DEVICE, - read_endpoint: str = READ_DEVICE, - timeout: float = 30, - ): - self.mac = mac - self.write_endpoint = write_endpoint - self.read_endpoint = read_endpoint - self.timeout = timeout - - self.delegate = self._QueueDelegate(Queue()) - self._read_buffer = bytearray() - - self._connect() - - def _ensure_connected(self): - if self.device is None: - self._connect() - - def _poll(self, timeout: float = 0.0001): - self._ensure_connected() - if TYPE_CHECKING: - self.device = cast(btle.Peripheral, self.device) - - start = time.time() - left = timeout - (time.time() - start) - while self.device.waitForNotifications(max(left, 0) or 0.0001): - left = timeout - (time.time() - start) - if left < 0: - break - - def _connect(self): - try: - del self.device - except Exception: - pass - self.device = btle.Peripheral(self.mac).withDelegate(self.delegate) - self._read_handle = self.device.getCharacteristics(uuid=self.read_endpoint)[0] - # self.delegate.handle = self._read_handle.handle - self._write_handle = self.device.getCharacteristics(uuid=self.write_endpoint)[0] - # print("Handles:", self._read_handle.handle, self._write_handle.handle) - - def _read(self, num: Optional[int] = None, timeout: Optional[float] = None): - self._ensure_connected() - if TYPE_CHECKING: - self.device = cast(btle.Peripheral, self.device) - - if timeout is None: - timeout = self.timeout - - if num is None: - start = time.time() - while not len(self._read_buffer): - left = timeout - (time.time() - start) - if left < 0: - break - self._poll() - try: - self._read_buffer.extend(self.delegate.queue.get_nowait()) - except queue.Empty: - pass - try: - while True: - self._poll() - self._read_buffer.extend(self.delegate.queue.get_nowait()) - - except queue.Empty: - pass - else: - start = time.time() - while len(self._read_buffer) < num: - left = timeout - (time.time() - start) - if left < 0: - break - self._poll() - try: - self._read_buffer.extend(self.delegate.queue.get_nowait()) - except queue.Empty: - pass - - if num is None: - data = bytes(self._read_buffer.copy()) - self._read_buffer.clear() - else: - data = bytes(self._read_buffer[:num]) - del self._read_buffer[:num] - return data or None - - def readinto(self, buffer: "WriteableBuffer") -> Optional[int]: - data = self._read(len(buffer)) - - if data is None: - return None - - buffer[: len(data)] = data - return len(data) - - def readall(self) -> bytes: - return self._read() - - def read( - self, size: Optional[int] = None, timeout: Optional[float] = None - ) -> Optional[bytes]: - if timeout: - _timeout = self.timeout - self.timeout = timeout - - if size is None: - res = super().read() - else: - res = super().read(size) - - if timeout: - self.timeout = _timeout - return res - - def write(self, b: "ReadableBuffer") -> Optional[int]: - self._ensure_connected() - if TYPE_CHECKING: - self.device = cast(btle.Peripheral, self.device) - - if self._write_handle is None: - raise IOError("write_handle not open") - self._write_handle.write(b, withResponse=True) - return len(b) - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - self.device.disconnect() - del self.device - - def seekable(self) -> bool: - return False - - def readable(self) -> bool: - return True - - def writable(self) -> bool: - return True - - # dlgt = Delegate() - - # prev = time.time() - INTERVAL - - # while True: - # try: - # log("Connecting...") - # with btle.Peripheral(MAC).withDelegate(dlgt) as dev: - # wd = dev.getCharacteristics(uuid=write_device)[0] - - def write(fh, data): bdata = bytes(data) crc = modbus(bdata) @@ -476,6 +284,8 @@ if __name__ == "__main__": try: log("Connecting...") with btle.Peripheral(MAC).withDelegate(dlgt) as dev: + from feasycom_ble import WRITE_DEVICE + wd = dev.getCharacteristics(uuid=WRITE_DEVICE)[0] log("Connected.") diff --git a/test_bleuart.py b/test_bleuart.py index 7af93d2..0f9fb21 100644 --- a/test_bleuart.py +++ b/test_bleuart.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -from solar_ble import MAC, BTLEUart, construct_request, write +from feasycom_ble import BTLEUart +from solar_ble import MAC, construct_request, write with BTLEUart(MAC, timeout=1) as x: