Split out BTLEUart to separate module

This commit is contained in:
Odd Stråbø 2021-11-05 06:21:01 +01:00
parent f652613ecc
commit 30246b9355
3 changed files with 191 additions and 193 deletions

187
feasycom_ble.py Normal file
View file

@ -0,0 +1,187 @@
# -*- coding: utf-8 -*-
import io
import queue
import time
from typing import TYPE_CHECKING, Optional, cast
from bluepy import btle
if TYPE_CHECKING:
from _typeshed import ReadableBuffer, WriteableBuffer
WRITE_DEVICE = "0000ffd1-0000-1000-8000-00805f9b34fb"
READ_DEVICE = "0000fff1-0000-1000-8000-00805f9b34fb"
class BTLEUart(io.RawIOBase):
mac: str
write_endpoint: str
read_endpoint: str
timeout: float
device: Optional[btle.Peripheral] = None
_write_handle: Optional[btle.Characteristic] = None
_read_handle: Optional[btle.Characteristic] = None
delegate: "_QueueDelegate"
_read_buffer: bytearray
class _QueueDelegate(btle.DefaultDelegate):
queue: queue.Queue
handle: Optional[int]
def __init__(self, queue, handle=None):
self.queue = queue
self.handle = handle
def handleNotification(self, cHandle: int, data: bytes):
# print("Notification:", cHandle, "sent data", binascii.b2a_hex(data))
if self.handle is not None and cHandle != self.handle:
return
self.queue.put(data)
def __init__(
self,
mac: str,
write_endpoint: str = WRITE_DEVICE,
read_endpoint: str = READ_DEVICE,
timeout: float = 30,
):
self.mac = mac
self.write_endpoint = write_endpoint
self.read_endpoint = read_endpoint
self.timeout = timeout
self.delegate = self._QueueDelegate(queue.Queue())
self._read_buffer = bytearray()
self._connect()
def _ensure_connected(self):
if self.device is None:
self._connect()
def _poll(self, timeout: float = 0.0001):
self._ensure_connected()
if TYPE_CHECKING:
self.device = cast(btle.Peripheral, self.device)
start = time.time()
left = timeout - (time.time() - start)
while self.device.waitForNotifications(max(left, 0) or 0.0001):
left = timeout - (time.time() - start)
if left < 0:
break
def _connect(self):
try:
del self.device
except Exception:
pass
self.device = btle.Peripheral(self.mac).withDelegate(self.delegate)
self._read_handle = self.device.getCharacteristics(uuid=self.read_endpoint)[0]
# self.delegate.handle = self._read_handle.handle
self._write_handle = self.device.getCharacteristics(uuid=self.write_endpoint)[0]
# print("Handles:", self._read_handle.handle, self._write_handle.handle)
def _read(self, num: Optional[int] = None, timeout: Optional[float] = None):
self._ensure_connected()
if TYPE_CHECKING:
self.device = cast(btle.Peripheral, self.device)
if timeout is None:
timeout = self.timeout
if num is None:
start = time.time()
while not len(self._read_buffer):
left = timeout - (time.time() - start)
if left < 0:
break
self._poll()
try:
self._read_buffer.extend(self.delegate.queue.get_nowait())
except queue.Empty:
pass
try:
while True:
self._poll()
self._read_buffer.extend(self.delegate.queue.get_nowait())
except queue.Empty:
pass
else:
start = time.time()
while len(self._read_buffer) < num:
left = timeout - (time.time() - start)
if left < 0:
break
self._poll()
try:
self._read_buffer.extend(self.delegate.queue.get_nowait())
except queue.Empty:
pass
if num is None:
data = bytes(self._read_buffer.copy())
self._read_buffer.clear()
else:
data = bytes(self._read_buffer[:num])
del self._read_buffer[:num]
return data or None
def readinto(self, buffer: "WriteableBuffer") -> Optional[int]:
data = self._read(len(buffer))
if data is None:
return None
buffer[: len(data)] = data
return len(data)
def readall(self) -> bytes:
return self._read()
def read(
self, size: Optional[int] = None, timeout: Optional[float] = None
) -> Optional[bytes]:
if timeout:
_timeout = self.timeout
self.timeout = timeout
if size is None:
res = super().read()
else:
res = super().read(size)
if timeout:
self.timeout = _timeout
return res
def write(self, b: "ReadableBuffer") -> Optional[int]:
self._ensure_connected()
if TYPE_CHECKING:
self.device = cast(btle.Peripheral, self.device)
if self._write_handle is None:
raise IOError("write_handle not open")
self._write_handle.write(b, withResponse=True)
return len(b)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.device.disconnect()
del self.device
def seekable(self) -> bool:
return False
def readable(self) -> bool:
return True
def writable(self) -> bool:
return True

View file

@ -2,16 +2,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import datetime import datetime
import io
import queue
import struct import struct
import sys import sys
import time import time
from queue import Queue
from typing import TYPE_CHECKING, Optional, cast
if TYPE_CHECKING:
from _typeshed import ReadableBuffer, WriteableBuffer
from bluepy import btle from bluepy import btle
from libscrc import modbus from libscrc import modbus
@ -21,8 +14,6 @@ INTERVAL = 15
# write_service = "0000ffd0-0000-1000-8000-00805f9b34fb" # write_service = "0000ffd0-0000-1000-8000-00805f9b34fb"
# read_service = "0000fff0-0000-1000-8000-00805f9b34fb" # read_service = "0000fff0-0000-1000-8000-00805f9b34fb"
WRITE_DEVICE = "0000ffd1-0000-1000-8000-00805f9b34fb"
READ_DEVICE = "0000fff1-0000-1000-8000-00805f9b34fb"
ACTION_READ = 0x03 ACTION_READ = 0x03
ACTION_WRITE = 0x03 ACTION_WRITE = 0x03
@ -235,189 +226,6 @@ class Delegate(btle.DefaultDelegate):
self.data.clear() self.data.clear()
class BTLEUart(io.RawIOBase):
mac: str
write_endpoint: str
read_endpoint: str
timeout: float
device: Optional[btle.Peripheral] = None
_write_handle: Optional[btle.Characteristic] = None
_read_handle: Optional[btle.Characteristic] = None
delegate: "_QueueDelegate"
_read_buffer: bytearray
class _QueueDelegate(btle.DefaultDelegate):
queue: Queue
handle: Optional[int]
def __init__(self, queue, handle=None):
self.queue = queue
self.handle = handle
def handleNotification(self, cHandle: int, data: bytes):
# print("Notification:", cHandle, "sent data", binascii.b2a_hex(data))
if self.handle is not None and cHandle != self.handle:
return
self.queue.put(data)
def __init__(
self,
mac: str,
write_endpoint: str = WRITE_DEVICE,
read_endpoint: str = READ_DEVICE,
timeout: float = 30,
):
self.mac = mac
self.write_endpoint = write_endpoint
self.read_endpoint = read_endpoint
self.timeout = timeout
self.delegate = self._QueueDelegate(Queue())
self._read_buffer = bytearray()
self._connect()
def _ensure_connected(self):
if self.device is None:
self._connect()
def _poll(self, timeout: float = 0.0001):
self._ensure_connected()
if TYPE_CHECKING:
self.device = cast(btle.Peripheral, self.device)
start = time.time()
left = timeout - (time.time() - start)
while self.device.waitForNotifications(max(left, 0) or 0.0001):
left = timeout - (time.time() - start)
if left < 0:
break
def _connect(self):
try:
del self.device
except Exception:
pass
self.device = btle.Peripheral(self.mac).withDelegate(self.delegate)
self._read_handle = self.device.getCharacteristics(uuid=self.read_endpoint)[0]
# self.delegate.handle = self._read_handle.handle
self._write_handle = self.device.getCharacteristics(uuid=self.write_endpoint)[0]
# print("Handles:", self._read_handle.handle, self._write_handle.handle)
def _read(self, num: Optional[int] = None, timeout: Optional[float] = None):
self._ensure_connected()
if TYPE_CHECKING:
self.device = cast(btle.Peripheral, self.device)
if timeout is None:
timeout = self.timeout
if num is None:
start = time.time()
while not len(self._read_buffer):
left = timeout - (time.time() - start)
if left < 0:
break
self._poll()
try:
self._read_buffer.extend(self.delegate.queue.get_nowait())
except queue.Empty:
pass
try:
while True:
self._poll()
self._read_buffer.extend(self.delegate.queue.get_nowait())
except queue.Empty:
pass
else:
start = time.time()
while len(self._read_buffer) < num:
left = timeout - (time.time() - start)
if left < 0:
break
self._poll()
try:
self._read_buffer.extend(self.delegate.queue.get_nowait())
except queue.Empty:
pass
if num is None:
data = bytes(self._read_buffer.copy())
self._read_buffer.clear()
else:
data = bytes(self._read_buffer[:num])
del self._read_buffer[:num]
return data or None
def readinto(self, buffer: "WriteableBuffer") -> Optional[int]:
data = self._read(len(buffer))
if data is None:
return None
buffer[: len(data)] = data
return len(data)
def readall(self) -> bytes:
return self._read()
def read(
self, size: Optional[int] = None, timeout: Optional[float] = None
) -> Optional[bytes]:
if timeout:
_timeout = self.timeout
self.timeout = timeout
if size is None:
res = super().read()
else:
res = super().read(size)
if timeout:
self.timeout = _timeout
return res
def write(self, b: "ReadableBuffer") -> Optional[int]:
self._ensure_connected()
if TYPE_CHECKING:
self.device = cast(btle.Peripheral, self.device)
if self._write_handle is None:
raise IOError("write_handle not open")
self._write_handle.write(b, withResponse=True)
return len(b)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.device.disconnect()
del self.device
def seekable(self) -> bool:
return False
def readable(self) -> bool:
return True
def writable(self) -> bool:
return True
# dlgt = Delegate()
# prev = time.time() - INTERVAL
# while True:
# try:
# log("Connecting...")
# with btle.Peripheral(MAC).withDelegate(dlgt) as dev:
# wd = dev.getCharacteristics(uuid=write_device)[0]
def write(fh, data): def write(fh, data):
bdata = bytes(data) bdata = bytes(data)
crc = modbus(bdata) crc = modbus(bdata)
@ -476,6 +284,8 @@ if __name__ == "__main__":
try: try:
log("Connecting...") log("Connecting...")
with btle.Peripheral(MAC).withDelegate(dlgt) as dev: with btle.Peripheral(MAC).withDelegate(dlgt) as dev:
from feasycom_ble import WRITE_DEVICE
wd = dev.getCharacteristics(uuid=WRITE_DEVICE)[0] wd = dev.getCharacteristics(uuid=WRITE_DEVICE)[0]
log("Connected.") log("Connected.")

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from solar_ble import MAC, BTLEUart, construct_request, write from feasycom_ble import BTLEUart
from solar_ble import MAC, construct_request, write
with BTLEUart(MAC, timeout=1) as x: with BTLEUart(MAC, timeout=1) as x: