From d60a07dfccfefb5fc41335778c88d6f7666b41d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Odd=20Str=C3=A5b=C3=B8?= Date: Thu, 4 Nov 2021 01:52:48 +0100 Subject: [PATCH] Implement wrapper class for the BTLE UART --- solar_ble.py | 180 +++++++++++++++++++++++++++++++++++++++++++++++- test_bleuart.py | 16 +++++ 2 files changed, 193 insertions(+), 3 deletions(-) create mode 100644 test_bleuart.py diff --git a/solar_ble.py b/solar_ble.py index 5cebb32..eb4dd3f 100755 --- a/solar_ble.py +++ b/solar_ble.py @@ -1,9 +1,17 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- + import datetime +import io +import queue import struct import sys import time +from queue import Queue +from typing import TYPE_CHECKING, Optional, cast + +if TYPE_CHECKING: + from _typeshed import ReadableBuffer, WriteableBuffer from bluepy import btle from libscrc import modbus @@ -13,8 +21,8 @@ INTERVAL = 15 # write_service = "0000ffd0-0000-1000-8000-00805f9b34fb" # read_service = "0000fff0-0000-1000-8000-00805f9b34fb" -write_device = "0000ffd1-0000-1000-8000-00805f9b34fb" -# read_device = "0000fff1-0000-1000-8000-00805f9b34fb" +WRITE_DEVICE = "0000ffd1-0000-1000-8000-00805f9b34fb" +READ_DEVICE = "0000fff1-0000-1000-8000-00805f9b34fb" # get(255, 12, 2) # "ff 03 00 0c 00 02" @@ -222,6 +230,172 @@ class Delegate(btle.DefaultDelegate): self.data.clear() +class BTLEUart(io.RawIOBase): + mac: str + write_endpoint: str + read_endpoint: str + timeout: float + + device: Optional[btle.Peripheral] = None + _write_handle: Optional[btle.Characteristic] = None + _read_handle: Optional[btle.Characteristic] = None + delegate: "_QueueDelegate" + _read_buffer: bytearray + + class _QueueDelegate(btle.DefaultDelegate): + queue: Queue + handle: Optional[int] + + def __init__(self, queue, handle=None): + self.queue = queue + self.handle = handle + + def handleNotification(self, cHandle: int, data: bytes): + # print("Notification:", cHandle, "sent data", binascii.b2a_hex(data)) + + if self.handle is not None and cHandle != self.handle: + return + + self.queue.put(data) + + def __init__( + self, + mac: str, + write_endpoint: str = WRITE_DEVICE, + read_endpoint: str = READ_DEVICE, + timeout: float = 30, + ): + self.mac = mac + self.write_endpoint = write_endpoint + self.read_endpoint = read_endpoint + self.timeout = timeout + + self.delegate = self._QueueDelegate(Queue()) + self._read_buffer = bytearray() + + self._connect() + + def _ensure_connected(self): + if self.device is None: + self._connect() + + def _poll(self, timeout: float = 0.0001): + self._ensure_connected() + if TYPE_CHECKING: + self.device = cast(btle.Peripheral, self.device) + + start = time.time() + left = timeout - (time.time() - start) + while self.device.waitForNotifications(max(left, 0) or 0.0001): + left = timeout - (time.time() - start) + if left < 0: + break + + def _connect(self): + try: + del self.device + except Exception: + pass + self.device = btle.Peripheral(self.mac).withDelegate(self.delegate) + self._read_handle = self.device.getCharacteristics(uuid=self.read_endpoint)[0] + # self.delegate.handle = self._read_handle.handle + self._write_handle = self.device.getCharacteristics(uuid=self.write_endpoint)[0] + # print("Handles:", self._read_handle.handle, self._write_handle.handle) + + def _read(self, num: Optional[int] = None, timeout: Optional[float] = None): + self._ensure_connected() + if TYPE_CHECKING: + self.device = cast(btle.Peripheral, self.device) + + if timeout is None: + timeout = self.timeout + + if num is None: + start = time.time() + while not len(self._read_buffer): + left = timeout - (time.time() - start) + if left < 0: + break + self.device.waitForNotifications(0.1) + try: + self._read_buffer.extend(self.delegate.queue.get_nowait()) + except queue.Empty: + pass + try: + while True: + self.device.waitForNotifications(0.01) + self._read_buffer.extend(self.delegate.queue.get_nowait()) + + except queue.Empty: + pass + else: + start = time.time() + while len(self._read_buffer) < num: + left = timeout - (time.time() - start) + if left < 0: + break + self.device.waitForNotifications(0.1) + try: + self._read_buffer.extend(self.delegate.queue.get_nowait()) + except queue.Empty: + pass + + if num is None: + data = bytes(self._read_buffer.copy()) + self._read_buffer.clear() + else: + data = bytes(self._read_buffer[:num]) + del self._read_buffer[:num] + return data or None + + def readinto(self, buffer: "WriteableBuffer") -> Optional[int]: + data = self._read(len(buffer)) + + if data is None: + return None + + buffer[: len(data)] = data + return len(data) + + def readall(self) -> bytes: + return self._read() + + def write(self, b: "ReadableBuffer") -> Optional[int]: + self._ensure_connected() + if TYPE_CHECKING: + self.device = cast(btle.Peripheral, self.device) + + if self._write_handle is None: + raise IOError("write_handle not open") + self._write_handle.write(b, withResponse=True) + return len(b) + + def __enter__(self): + return self + + def __exit__(self): + self.device.disconnect() + + def seekable(self) -> bool: + return False + + def readable(self) -> bool: + return True + + def writable(self) -> bool: + return True + + # dlgt = Delegate() + + # prev = time.time() - INTERVAL + + # while True: + # try: + # log("Connecting...") + # with btle.Peripheral(MAC).withDelegate(dlgt) as dev: + # wd = dev.getCharacteristics(uuid=write_device)[0] + + def write(fh, data): bdata = bytes(data) crc = modbus(bdata) @@ -279,7 +453,7 @@ if __name__ == "__main__": try: log("Connecting...") with btle.Peripheral(MAC).withDelegate(dlgt) as dev: - wd = dev.getCharacteristics(uuid=write_device)[0] + wd = dev.getCharacteristics(uuid=WRITE_DEVICE)[0] log("Connected.") diff --git a/test_bleuart.py b/test_bleuart.py new file mode 100644 index 0000000..a8321a4 --- /dev/null +++ b/test_bleuart.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +from solar_ble import MAC, BTLEUart, construct_request, write + +x = BTLEUart(MAC) + +print(x) +write(x, construct_request(0x0C, 8)) + +print(x.read(3)) +print(x.read(8 * 2)) +print(x.read(2)) + +x.timeout = 10 + +print(x.read()) +print(x.read(1))