# -*- coding: utf-8 -*- import io import queue import time from typing import TYPE_CHECKING, Optional, cast from bluepy import btle # type: ignore WRITE_DEVICE = "0000ffd1-0000-1000-8000-00805f9b34fb" READ_DEVICE = "0000fff1-0000-1000-8000-00805f9b34fb" class BTLEUart(io.RawIOBase): mac: str write_endpoint: str read_endpoint: str timeout: float | None device: Optional[btle.Peripheral] = None _write_handle: Optional[btle.Characteristic] = None _read_handle: Optional[btle.Characteristic] = None delegate: "_QueueDelegate" _read_buffer: bytearray class _QueueDelegate(btle.DefaultDelegate): queue: queue.Queue handle: Optional[int] def __init__(self, queue, handle=None): self.queue = queue self.handle = handle def handleNotification(self, cHandle: int, data: bytes): # print("Notification:", cHandle, "sent data", binascii.b2a_hex(data)) if self.handle is not None and cHandle != self.handle: return self.queue.put(data) def __init__( self, mac: str, write_endpoint: str = WRITE_DEVICE, read_endpoint: str = READ_DEVICE, timeout: float = 30, ): self.mac = mac self.write_endpoint = write_endpoint self.read_endpoint = read_endpoint self.timeout = timeout self.delegate = self._QueueDelegate(queue.Queue()) self._read_buffer = bytearray() self._connect() def _ensure_connected(self): if self.device is None: self._connect() def _poll(self, timeout: float = 0.0001): self._ensure_connected() if TYPE_CHECKING: self.device = cast(btle.Peripheral, self.device) start = time.time() left = timeout - (time.time() - start) while self.device.waitForNotifications(max(left, 0) or 0.0001): left = timeout - (time.time() - start) if left < 0: break def _connect(self): try: del self.device except Exception: pass self.device = btle.Peripheral(self.mac).withDelegate(self.delegate) self._read_handle = self.device.getCharacteristics(uuid=self.read_endpoint)[0] # self.delegate.handle = self._read_handle.handle self._write_handle = self.device.getCharacteristics(uuid=self.write_endpoint)[0] # print("Handles:", self._read_handle.handle, self._write_handle.handle) def _read(self, num: Optional[int] = None): self._ensure_connected() if TYPE_CHECKING: self.device = cast(btle.Peripheral, self.device) timeout = self.timeout or 30 if num is None: start = time.time() while not len(self._read_buffer): left = timeout - (time.time() - start) if left < 0: break self._poll() try: self._read_buffer.extend(self.delegate.queue.get_nowait()) except queue.Empty: pass try: while True: self._poll() self._read_buffer.extend(self.delegate.queue.get_nowait()) except queue.Empty: pass else: start = time.time() while len(self._read_buffer) < num: left = timeout - (time.time() - start) if left < 0: break self._poll() try: self._read_buffer.extend(self.delegate.queue.get_nowait()) except queue.Empty: pass if num is None: data = bytes(self._read_buffer.copy()) self._read_buffer.clear() else: data = bytes(self._read_buffer[:num]) del self._read_buffer[:num] return data or None def readinto(self, buffer: bytearray | memoryview) -> Optional[int]: # type: ignore [override] # Buffer does not provide Sized, and bytes is read only. # bytearray | memoryview is the default implementations that provide WriteableBuffer data = self._read(len(buffer)) if data is None: return None buffer[: len(data)] = data return len(data) def readall(self) -> bytes: return self._read() def read(self, size: Optional[int] = None) -> Optional[bytes]: if size is None: res = super().read() else: res = super().read(size) return res def write(self, b: bytes | bytearray | memoryview) -> Optional[int]: # type: ignore [override] self._ensure_connected() if TYPE_CHECKING: self.device = cast(btle.Peripheral, self.device) if self._write_handle is None: raise IOError("write_handle not open") self._write_handle.write(b, withResponse=True) return len(b) def __enter__(self): return self def __exit__(self, type, value, traceback): if self.device is not None: self.device.disconnect() self.device = None def seekable(self) -> bool: return False def readable(self) -> bool: return True def writable(self) -> bool: return True